HBASE-26213 Refactor AnnotationReadingPriorityFunction (#3614)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
parent
4b0a64033a
commit
1fb78a3302
hbase-server/src
main/java/org/apache/hadoop/hbase
ipc
master
regionserver
test/java/org/apache/hadoop/hbase
ipc
master
regionserver
|
@ -0,0 +1,135 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads special method annotations and table names to figure a priority for use by QoS facility in
|
||||||
|
* ipc; e.g: rpcs to hbase:meta get priority.
|
||||||
|
*/
|
||||||
|
// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
|
||||||
|
// suggests and just have the client specify a priority.
|
||||||
|
|
||||||
|
// The logic for figuring out high priority RPCs is as follows:
|
||||||
|
// 1. if the method is annotated with a QosPriority of QOS_HIGH,
|
||||||
|
// that is honored
|
||||||
|
// 2. parse out the protobuf message and see if the request is for meta
|
||||||
|
// region, and if so, treat it as a high priority RPC
|
||||||
|
// Some optimizations for (2) are done in the sub classes -
|
||||||
|
// Clients send the argument classname as part of making the RPC. The server
|
||||||
|
// decides whether to deserialize the proto argument message based on the
|
||||||
|
// pre-established set of argument classes (knownArgumentClasses below).
|
||||||
|
// This prevents the server from having to deserialize all proto argument
|
||||||
|
// messages prematurely.
|
||||||
|
// All the argument classes declare a 'getRegion' method that returns a
|
||||||
|
// RegionSpecifier object. Methods can be invoked on the returned object
|
||||||
|
// to figure out whether it is a meta region or not.
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public abstract class AnnotationReadingPriorityFunction<T extends RSRpcServices>
|
||||||
|
implements PriorityFunction {
|
||||||
|
|
||||||
|
protected final Map<String, Integer> annotatedQos;
|
||||||
|
// We need to mock the regionserver instance for some unit tests (set via
|
||||||
|
// setRegionServer method.
|
||||||
|
protected final T rpcServices;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs the priority function given the RPC server implementation and the annotations on the
|
||||||
|
* methods.
|
||||||
|
* @param rpcServices The RPC server implementation
|
||||||
|
*/
|
||||||
|
public AnnotationReadingPriorityFunction(final T rpcServices) {
|
||||||
|
Map<String, Integer> qosMap = new HashMap<>();
|
||||||
|
for (Method m : rpcServices.getClass().getMethods()) {
|
||||||
|
QosPriority p = m.getAnnotation(QosPriority.class);
|
||||||
|
if (p != null) {
|
||||||
|
// Since we protobuf'd, and then subsequently, when we went with pb style, method names
|
||||||
|
// are capitalized. This meant that this brittle compare of method names gotten by
|
||||||
|
// reflection no longer matched the method names coming in over pb.
|
||||||
|
// TODO: Get rid of this check. For now, workaround is to capitalize the names we got from
|
||||||
|
// reflection so they have chance of matching the pb ones.
|
||||||
|
String capitalizedMethodName = StringUtils.capitalize(m.getName());
|
||||||
|
qosMap.put(capitalizedMethodName, p.priority());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.rpcServices = rpcServices;
|
||||||
|
this.annotatedQos = qosMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a 'priority' based on the request type.
|
||||||
|
* <p/>
|
||||||
|
* Currently the returned priority is used for queue selection.
|
||||||
|
* <p/>
|
||||||
|
* See the {@code SimpleRpcScheduler} as example. It maintains a queue per 'priority type':
|
||||||
|
* <ul>
|
||||||
|
* <li>HIGH_QOS (meta requests)</li>
|
||||||
|
* <li>REPLICATION_QOS (replication requests)</li>
|
||||||
|
* <li>NORMAL_QOS (user requests).</li>
|
||||||
|
* </ul>
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getPriority(RequestHeader header, Message param, User user) {
|
||||||
|
int priorityByAnnotation = getAnnotatedPriority(header);
|
||||||
|
|
||||||
|
if (priorityByAnnotation >= 0) {
|
||||||
|
return priorityByAnnotation;
|
||||||
|
}
|
||||||
|
if (param == null) {
|
||||||
|
return HConstants.NORMAL_QOS;
|
||||||
|
}
|
||||||
|
return getBasePriority(header, param);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* See if the method has an annotation.
|
||||||
|
* @return Return the priority from the annotation. If there isn't an annotation, this returns
|
||||||
|
* something below zero.
|
||||||
|
*/
|
||||||
|
protected int getAnnotatedPriority(RequestHeader header) {
|
||||||
|
String methodName = header.getMethodName();
|
||||||
|
Integer priorityByAnnotation = annotatedQos.get(methodName);
|
||||||
|
if (priorityByAnnotation != null) {
|
||||||
|
return normalizePriority(priorityByAnnotation);
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract int normalizePriority(int priority);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the priority for a given request from the header and the param.
|
||||||
|
* <p/>
|
||||||
|
* This doesn't consider which user is sending the request at all.
|
||||||
|
* <p/>
|
||||||
|
* This doesn't consider annotations
|
||||||
|
*/
|
||||||
|
protected abstract int getBasePriority(RequestHeader header, Message param);
|
||||||
|
}
|
|
@ -19,64 +19,59 @@ package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
import org.apache.hadoop.hbase.ipc.AnnotationReadingPriorityFunction;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Priority function specifically for the master.
|
* Priority function specifically for the master.
|
||||||
*
|
* <p/>
|
||||||
* This doesn't make the super users always priority since that would make everything
|
* This doesn't make the super users always priority since that would make everything to the master
|
||||||
* to the master into high priority.
|
* into high priority.
|
||||||
*
|
* <p/>
|
||||||
* Specifically when reporting that a region is in transition master will try and edit the meta
|
* Specifically when reporting that a region is in transition master will try and edit the meta
|
||||||
* table. That edit will block the thread until successful. However if at the same time meta is
|
* table. That edit will block the thread until successful. However if at the same time meta is also
|
||||||
* also moving then we need to ensure that the regular region that's moving isn't blocking
|
* moving then we need to ensure that the regular region that's moving isn't blocking processing of
|
||||||
* processing of the request to online meta. To accomplish this this priority function makes sure
|
* the request to online meta. To accomplish this this priority function makes sure that all
|
||||||
* that all requests to transition meta are handled in different threads from other report region
|
* requests to transition meta are handled in different threads from other report region in
|
||||||
* in transition calls.
|
* transition calls.
|
||||||
|
* <p/>
|
||||||
* After HBASE-21754, ReportRegionStateTransitionRequest for meta region will be assigned a META_QOS
|
* After HBASE-21754, ReportRegionStateTransitionRequest for meta region will be assigned a META_QOS
|
||||||
* , a separate executor called metaTransitionExecutor will execute it. Other transition request
|
* , a separate executor called metaTransitionExecutor will execute it. Other transition request
|
||||||
* will be executed in priorityExecutor to prevent being mixed with normal requests
|
* will be executed in priorityExecutor to prevent being mixed with normal requests
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction {
|
public class MasterAnnotationReadingPriorityFunction
|
||||||
|
extends AnnotationReadingPriorityFunction<MasterRpcServices> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We reference this value in SimpleRpcScheduler so this class have to be public instead of
|
||||||
|
* package private
|
||||||
|
*/
|
||||||
public static final int META_TRANSITION_QOS = 300;
|
public static final int META_TRANSITION_QOS = 300;
|
||||||
|
|
||||||
public MasterAnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
|
MasterAnnotationReadingPriorityFunction(MasterRpcServices rpcServices) {
|
||||||
this(rpcServices, rpcServices.getClass());
|
super(rpcServices);
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
public MasterAnnotationReadingPriorityFunction(RSRpcServices rpcServices,
|
|
||||||
Class<? extends RSRpcServices> clz) {
|
|
||||||
super(rpcServices, clz);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getPriority(RPCProtos.RequestHeader header, Message param, User user) {
|
protected int normalizePriority(int priority) {
|
||||||
// Yes this is copy pasted from the base class but it keeps from having to look in the
|
// no one can have higher priority than meta transition.
|
||||||
// annotatedQos table twice something that could get costly since this is called for
|
if (priority >= META_TRANSITION_QOS) {
|
||||||
// every single RPC request.
|
return META_TRANSITION_QOS - 1;
|
||||||
int priorityByAnnotation = getAnnotatedPriority(header);
|
} else {
|
||||||
if (priorityByAnnotation >= 0) {
|
return priority;
|
||||||
// no one can have higher priority than meta transition.
|
|
||||||
if (priorityByAnnotation >= META_TRANSITION_QOS) {
|
|
||||||
return META_TRANSITION_QOS - 1;
|
|
||||||
} else {
|
|
||||||
return priorityByAnnotation;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getBasePriority(RequestHeader header, Message param) {
|
||||||
// If meta is moving then all the other of reports of state transitions will be
|
// If meta is moving then all the other of reports of state transitions will be
|
||||||
// un able to edit meta. Those blocked reports should not keep the report that opens meta from
|
// un able to edit meta. Those blocked reports should not keep the report that opens meta from
|
||||||
// running. Hence all reports of meta transition should always be in a different thread.
|
// running. Hence all reports of meta transition should always be in a different thread.
|
||||||
|
@ -101,8 +96,15 @@ public class MasterAnnotationReadingPriorityFunction extends AnnotationReadingPr
|
||||||
if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
|
if (param instanceof RegionServerStatusProtos.RegionServerReportRequest) {
|
||||||
return HConstants.HIGH_QOS;
|
return HConstants.HIGH_QOS;
|
||||||
}
|
}
|
||||||
|
// Trust the client-set priorities if set
|
||||||
|
if (header.hasPriority()) {
|
||||||
|
return header.getPriority();
|
||||||
|
}
|
||||||
|
return HConstants.NORMAL_QOS;
|
||||||
|
}
|
||||||
|
|
||||||
// Handle the rest of the different reasons to change priority.
|
@Override
|
||||||
return getBasePriority(header, param);
|
public long getDeadline(RequestHeader header, Message param) {
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,9 +22,7 @@ import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
import org.apache.hadoop.hbase.ipc.AnnotationReadingPriorityFunction;
|
||||||
import org.apache.hadoop.hbase.ipc.QosPriority;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -44,49 +42,22 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads special method annotations and table names to figure a priority for use by QoS facility in
|
* Priority function specifically for the region server.
|
||||||
* ipc; e.g: rpcs to hbase:meta get priority.
|
|
||||||
*/
|
*/
|
||||||
// TODO: Remove. This is doing way too much work just to figure a priority. Do as Elliott
|
|
||||||
// suggests and just have the client specify a priority.
|
|
||||||
|
|
||||||
//The logic for figuring out high priority RPCs is as follows:
|
|
||||||
//1. if the method is annotated with a QosPriority of QOS_HIGH,
|
|
||||||
// that is honored
|
|
||||||
//2. parse out the protobuf message and see if the request is for meta
|
|
||||||
// region, and if so, treat it as a high priority RPC
|
|
||||||
//Some optimizations for (2) are done here -
|
|
||||||
//Clients send the argument classname as part of making the RPC. The server
|
|
||||||
//decides whether to deserialize the proto argument message based on the
|
|
||||||
//pre-established set of argument classes (knownArgumentClasses below).
|
|
||||||
//This prevents the server from having to deserialize all proto argument
|
|
||||||
//messages prematurely.
|
|
||||||
//All the argument classes declare a 'getRegion' method that returns a
|
|
||||||
//RegionSpecifier object. Methods can be invoked on the returned object
|
|
||||||
//to figure out whether it is a meta region or not.
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
class RSAnnotationReadingPriorityFunction extends AnnotationReadingPriorityFunction<RSRpcServices> {
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AnnotationReadingPriorityFunction.class.getName());
|
LoggerFactory.getLogger(RSAnnotationReadingPriorityFunction.class);
|
||||||
|
|
||||||
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
|
/** Used to control the scan delay, currently sqrt(numNextCall * weight) */
|
||||||
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
|
public static final String SCAN_VTIME_WEIGHT_CONF_KEY = "hbase.ipc.server.scan.vtime.weight";
|
||||||
|
|
||||||
protected final Map<String, Integer> annotatedQos;
|
|
||||||
//We need to mock the regionserver instance for some unit tests (set via
|
|
||||||
//setRegionServer method.
|
|
||||||
private RSRpcServices rpcServices;
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
|
private final Class<? extends Message>[] knownArgumentClasses =
|
||||||
GetRegionInfoRequest.class,
|
new Class[] { GetRegionInfoRequest.class, GetStoreFileRequest.class, CloseRegionRequest.class,
|
||||||
GetStoreFileRequest.class,
|
FlushRegionRequest.class, CompactRegionRequest.class, GetRequest.class, MutateRequest.class,
|
||||||
CloseRegionRequest.class,
|
ScanRequest.class };
|
||||||
FlushRegionRequest.class,
|
|
||||||
CompactRegionRequest.class,
|
|
||||||
GetRequest.class,
|
|
||||||
MutateRequest.class,
|
|
||||||
ScanRequest.class
|
|
||||||
};
|
|
||||||
|
|
||||||
// Some caches for helping performance
|
// Some caches for helping performance
|
||||||
private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
|
private final Map<String, Class<? extends Message>> argumentToClassMap = new HashMap<>();
|
||||||
|
@ -94,43 +65,8 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
|
|
||||||
private final float scanVirtualTimeWeight;
|
private final float scanVirtualTimeWeight;
|
||||||
|
|
||||||
/**
|
RSAnnotationReadingPriorityFunction(RSRpcServices rpcServices) {
|
||||||
* Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
|
super(rpcServices);
|
||||||
* {@code rpcServices#getClass()}
|
|
||||||
*
|
|
||||||
* @param rpcServices
|
|
||||||
* The RPC server implementation
|
|
||||||
*/
|
|
||||||
public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices) {
|
|
||||||
this(rpcServices, rpcServices.getClass());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructs the priority function given the RPC server implementation and the annotations on the
|
|
||||||
* methods in the provided {@code clz}.
|
|
||||||
*
|
|
||||||
* @param rpcServices
|
|
||||||
* The RPC server implementation
|
|
||||||
* @param clz
|
|
||||||
* The concrete RPC server implementation's class
|
|
||||||
*/
|
|
||||||
public AnnotationReadingPriorityFunction(final RSRpcServices rpcServices,
|
|
||||||
Class<? extends RSRpcServices> clz) {
|
|
||||||
Map<String,Integer> qosMap = new HashMap<>();
|
|
||||||
for (Method m : clz.getMethods()) {
|
|
||||||
QosPriority p = m.getAnnotation(QosPriority.class);
|
|
||||||
if (p != null) {
|
|
||||||
// Since we protobuf'd, and then subsequently, when we went with pb style, method names
|
|
||||||
// are capitalized. This meant that this brittle compare of method names gotten by
|
|
||||||
// reflection no longer matched the method names coming in over pb. TODO: Get rid of this
|
|
||||||
// check. For now, workaround is to capitalize the names we got from reflection so they
|
|
||||||
// have chance of matching the pb ones.
|
|
||||||
String capitalizedMethodName = capitalize(m.getName());
|
|
||||||
qosMap.put(capitalizedMethodName, p.priority());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.rpcServices = rpcServices;
|
|
||||||
this.annotatedQos = qosMap;
|
|
||||||
if (methodMap.get("getRegion") == null) {
|
if (methodMap.get("getRegion") == null) {
|
||||||
methodMap.put("hasRegion", new HashMap<>());
|
methodMap.put("hasRegion", new HashMap<>());
|
||||||
methodMap.put("getRegion", new HashMap<>());
|
methodMap.put("getRegion", new HashMap<>());
|
||||||
|
@ -149,91 +85,48 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
|
scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String capitalize(final String s) {
|
|
||||||
StringBuilder strBuilder = new StringBuilder(s);
|
|
||||||
strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
|
|
||||||
return strBuilder.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a 'priority' based on the request type.
|
|
||||||
*
|
|
||||||
* Currently the returned priority is used for queue selection.
|
|
||||||
* See the SimpleRpcScheduler as example. It maintains a queue per 'priory type'
|
|
||||||
* HIGH_QOS (meta requests), REPLICATION_QOS (replication requests),
|
|
||||||
* NORMAL_QOS (user requests).
|
|
||||||
*/
|
|
||||||
@Override
|
@Override
|
||||||
public int getPriority(RequestHeader header, Message param, User user) {
|
protected int normalizePriority(int priority) {
|
||||||
int priorityByAnnotation = getAnnotatedPriority(header);
|
return priority;
|
||||||
|
|
||||||
if (priorityByAnnotation >= 0) {
|
|
||||||
return priorityByAnnotation;
|
|
||||||
}
|
|
||||||
return getBasePriority(header, param);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* See if the method has an annotation.
|
|
||||||
* @param header
|
|
||||||
* @return Return the priority from the annotation. If there isn't
|
|
||||||
* an annotation, this returns something below zero.
|
|
||||||
*/
|
|
||||||
protected int getAnnotatedPriority(RequestHeader header) {
|
|
||||||
String methodName = header.getMethodName();
|
|
||||||
Integer priorityByAnnotation = annotatedQos.get(methodName);
|
|
||||||
if (priorityByAnnotation != null) {
|
|
||||||
return priorityByAnnotation;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the priority for a given request from the header and the param
|
|
||||||
* This doesn't consider which user is sending the request at all.
|
|
||||||
* This doesn't consider annotations
|
|
||||||
*/
|
|
||||||
protected int getBasePriority(RequestHeader header, Message param) {
|
protected int getBasePriority(RequestHeader header, Message param) {
|
||||||
if (param == null) {
|
|
||||||
return HConstants.NORMAL_QOS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Trust the client-set priorities if set
|
// Trust the client-set priorities if set
|
||||||
if (header.hasPriority()) {
|
if (header.hasPriority()) {
|
||||||
return header.getPriority();
|
return header.getPriority();
|
||||||
}
|
}
|
||||||
|
|
||||||
String cls = param.getClass().getName();
|
String cls = param.getClass().getName();
|
||||||
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
|
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
|
||||||
RegionSpecifier regionSpecifier = null;
|
RegionSpecifier regionSpecifier = null;
|
||||||
//check whether the request has reference to meta region or now.
|
// check whether the request has reference to meta region or now.
|
||||||
try {
|
try {
|
||||||
// Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
|
// Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
|
||||||
// hasRegion returns true. Not all listed methods have region specifier each time. For
|
// hasRegion returns true. Not all listed methods have region specifier each time. For
|
||||||
// example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
|
// example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
|
||||||
// send the region over every time.
|
// send the region over every time.
|
||||||
Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
|
Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
|
||||||
if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
|
if (hasRegion != null && (Boolean) hasRegion.invoke(param, (Object[]) null)) {
|
||||||
Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
|
Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
|
||||||
regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
|
regionSpecifier = (RegionSpecifier) getRegion.invoke(param, (Object[]) null);
|
||||||
Region region = rpcServices.getRegion(regionSpecifier);
|
Region region = rpcServices.getRegion(regionSpecifier);
|
||||||
if (region.getRegionInfo().getTable().isSystemTable()) {
|
if (region.getRegionInfo().getTable().isSystemTable()) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("High priority because region=" +
|
LOG.trace(
|
||||||
region.getRegionInfo().getRegionNameAsString());
|
"High priority because region=" + region.getRegionInfo().getRegionNameAsString());
|
||||||
}
|
}
|
||||||
return HConstants.SYSTEMTABLE_QOS;
|
return HConstants.SYSTEMTABLE_QOS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
// Not good throwing an exception out of here, a runtime anyways. Let the query go into the
|
// Not good throwing an exception out of here, a runtime anyways. Let the query go into the
|
||||||
// server and have it throw the exception if still an issue. Just mark it normal priority.
|
// server and have it throw the exception if still an issue. Just mark it normal priority.
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
|
if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
|
||||||
return HConstants.NORMAL_QOS;
|
return HConstants.NORMAL_QOS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (param instanceof ScanRequest) { // scanner methods...
|
if (param instanceof ScanRequest) { // scanner methods...
|
||||||
ScanRequest request = (ScanRequest)param;
|
ScanRequest request = (ScanRequest) param;
|
||||||
if (!request.hasScannerId()) {
|
if (!request.hasScannerId()) {
|
||||||
return HConstants.NORMAL_QOS;
|
return HConstants.NORMAL_QOS;
|
||||||
}
|
}
|
||||||
|
@ -252,15 +145,12 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Based on the request content, returns the deadline of the request.
|
* Based on the request content, returns the deadline of the request.
|
||||||
*
|
|
||||||
* @param header
|
|
||||||
* @param param
|
|
||||||
* @return Deadline of this request. 0 now, otherwise msec of 'delay'
|
* @return Deadline of this request. 0 now, otherwise msec of 'delay'
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getDeadline(RequestHeader header, Message param) {
|
public long getDeadline(RequestHeader header, Message param) {
|
||||||
if (param instanceof ScanRequest) {
|
if (param instanceof ScanRequest) {
|
||||||
ScanRequest request = (ScanRequest)param;
|
ScanRequest request = (ScanRequest) param;
|
||||||
if (!request.hasScannerId()) {
|
if (!request.hasScannerId()) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -273,8 +163,4 @@ public class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setRegionServer(final HRegionServer hrs) {
|
|
||||||
this.rpcServices = hrs.getRSRpcServices();
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -1331,7 +1331,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
|
|
||||||
protected PriorityFunction createPriority() {
|
protected PriorityFunction createPriority() {
|
||||||
return new AnnotationReadingPriorityFunction(this);
|
return new RSAnnotationReadingPriorityFunction(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void requirePermission(String request, Permission.Action perm) throws IOException {
|
protected void requirePermission(String request, Permission.Action perm) throws IOException {
|
||||||
|
|
|
@ -15,27 +15,29 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class QosTestHelper {
|
import org.apache.hadoop.conf.Configuration;
|
||||||
protected void checkMethod(Configuration conf, final String methodName, final int expected,
|
import org.apache.hadoop.hbase.security.User;
|
||||||
final AnnotationReadingPriorityFunction qosf) {
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
||||||
|
|
||||||
|
public class QosTestBase {
|
||||||
|
|
||||||
|
protected final void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||||
|
final AnnotationReadingPriorityFunction<?> qosf) {
|
||||||
checkMethod(conf, methodName, expected, qosf, null);
|
checkMethod(conf, methodName, expected, qosf, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void checkMethod(Configuration conf, final String methodName, final int expected,
|
protected final void checkMethod(Configuration conf, final String methodName, final int expected,
|
||||||
final AnnotationReadingPriorityFunction qosf, final Message param) {
|
final AnnotationReadingPriorityFunction<?> qosf, final Message param) {
|
||||||
RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
|
RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
|
||||||
builder.setMethodName(methodName);
|
builder.setMethodName(methodName);
|
||||||
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param,
|
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param,
|
||||||
User.createUserForTesting(conf, "someuser", new String[]{"somegroup"})));
|
User.createUserForTesting(conf, "someuser", new String[] { "somegroup" })));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -24,12 +24,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.QosTestHelper;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.AnnotationReadingPriorityFunction;
|
import org.apache.hadoop.hbase.ipc.QosTestBase;
|
||||||
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -44,15 +42,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos;
|
||||||
|
|
||||||
@Category({MasterTests.class, SmallTests.class})
|
@Category({MasterTests.class, SmallTests.class})
|
||||||
public class TestMasterQosFunction extends QosTestHelper {
|
public class TestMasterQosFunction extends QosTestBase {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestMasterQosFunction.class);
|
HBaseClassTestRule.forClass(TestMasterQosFunction.class);
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private RSRpcServices rpcServices;
|
private MasterRpcServices rpcServices;
|
||||||
private AnnotationReadingPriorityFunction qosFunction;
|
private MasterAnnotationReadingPriorityFunction qosFunction;
|
||||||
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -60,7 +58,7 @@ public class TestMasterQosFunction extends QosTestHelper {
|
||||||
conf = HBaseConfiguration.create();
|
conf = HBaseConfiguration.create();
|
||||||
rpcServices = Mockito.mock(MasterRpcServices.class);
|
rpcServices = Mockito.mock(MasterRpcServices.class);
|
||||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||||
qosFunction = new MasterAnnotationReadingPriorityFunction(rpcServices, MasterRpcServices.class);
|
qosFunction = new MasterAnnotationReadingPriorityFunction(rpcServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -18,25 +18,25 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.ipc.PriorityFunction;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
@ -51,44 +51,28 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader
|
||||||
/**
|
/**
|
||||||
* Tests that verify certain RPCs get a higher QoS.
|
* Tests that verify certain RPCs get a higher QoS.
|
||||||
*/
|
*/
|
||||||
@Category({RegionServerTests.class, MediumTests.class})
|
@Category({ RegionServerTests.class, SmallTests.class })
|
||||||
public class TestPriorityRpc {
|
public class TestPriorityRpc {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestPriorityRpc.class);
|
HBaseClassTestRule.forClass(TestPriorityRpc.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
private static Configuration CONF = HBaseConfiguration.create();
|
||||||
|
|
||||||
private static HRegionServer RS = null;
|
|
||||||
private static PriorityFunction PRIORITY = null;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUp() throws Exception {
|
|
||||||
UTIL.startMiniCluster(1);
|
|
||||||
RS = UTIL.getHBaseCluster().getRegionServer(0);
|
|
||||||
PRIORITY = RS.rpcServices.getPriority();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws IOException {
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQosFunctionForMeta() throws IOException {
|
public void testQosFunctionForMeta() throws IOException {
|
||||||
PRIORITY = RS.rpcServices.getPriority();
|
|
||||||
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
|
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
|
||||||
//create a rpc request that has references to hbase:meta region and also
|
// create a rpc request that has references to hbase:meta region and also
|
||||||
//uses one of the known argument classes (known argument classes are
|
// uses one of the known argument classes (known argument classes are
|
||||||
//listed in HRegionServer.QosFunctionImpl.knownArgumentClasses)
|
// listed in HRegionServer.QosFunctionImpl.knownArgumentClasses)
|
||||||
headerBuilder.setMethodName("foo");
|
headerBuilder.setMethodName("foo");
|
||||||
|
|
||||||
GetRequest.Builder getRequestBuilder = GetRequest.newBuilder();
|
GetRequest.Builder getRequestBuilder = GetRequest.newBuilder();
|
||||||
RegionSpecifier.Builder regionSpecifierBuilder = RegionSpecifier.newBuilder();
|
RegionSpecifier.Builder regionSpecifierBuilder = RegionSpecifier.newBuilder();
|
||||||
regionSpecifierBuilder.setType(RegionSpecifierType.REGION_NAME);
|
regionSpecifierBuilder.setType(RegionSpecifierType.REGION_NAME);
|
||||||
ByteString name = UnsafeByteOperations.unsafeWrap(
|
ByteString name =
|
||||||
RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
|
UnsafeByteOperations.unsafeWrap(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
|
||||||
regionSpecifierBuilder.setValue(name);
|
regionSpecifierBuilder.setValue(name);
|
||||||
RegionSpecifier regionSpecifier = regionSpecifierBuilder.build();
|
RegionSpecifier regionSpecifier = regionSpecifierBuilder.build();
|
||||||
getRequestBuilder.setRegion(regionSpecifier);
|
getRequestBuilder.setRegion(regionSpecifier);
|
||||||
|
@ -97,31 +81,32 @@ public class TestPriorityRpc {
|
||||||
getRequestBuilder.setGet(getBuilder.build());
|
getRequestBuilder.setGet(getBuilder.build());
|
||||||
GetRequest getRequest = getRequestBuilder.build();
|
GetRequest getRequest = getRequestBuilder.build();
|
||||||
RequestHeader header = headerBuilder.build();
|
RequestHeader header = headerBuilder.build();
|
||||||
HRegion mockRegion = Mockito.mock(HRegion.class);
|
HRegion mockRegion = mock(HRegion.class);
|
||||||
HRegionServer mockRS = Mockito.mock(HRegionServer.class);
|
RSRpcServices mockRpc = mock(RSRpcServices.class);
|
||||||
RSRpcServices mockRpc = Mockito.mock(RSRpcServices.class);
|
when(mockRpc.getConfiguration()).thenReturn(CONF);
|
||||||
Mockito.when(mockRS.getRSRpcServices()).thenReturn(mockRpc);
|
RegionInfo mockRegionInfo = mock(RegionInfo.class);
|
||||||
RegionInfo mockRegionInfo = Mockito.mock(RegionInfo.class);
|
when(mockRpc.getRegion(any())).thenReturn(mockRegion);
|
||||||
Mockito.when(mockRpc.getRegion(Mockito.any())).thenReturn(mockRegion);
|
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||||
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
when(mockRegionInfo.getTable()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
|
||||||
Mockito.when(mockRegionInfo.getTable())
|
|
||||||
.thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
|
RSAnnotationReadingPriorityFunction qosFunc = new RSAnnotationReadingPriorityFunction(mockRpc);
|
||||||
// Presume type.
|
assertEquals(HConstants.SYSTEMTABLE_QOS,
|
||||||
((AnnotationReadingPriorityFunction)PRIORITY).setRegionServer(mockRS);
|
qosFunc.getPriority(header, getRequest, createSomeUser()));
|
||||||
assertEquals(
|
|
||||||
HConstants.SYSTEMTABLE_QOS, PRIORITY.getPriority(header, getRequest, createSomeUser()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQosFunctionWithoutKnownArgument() throws IOException {
|
public void testQosFunctionWithoutKnownArgument() throws IOException {
|
||||||
//The request is not using any of the
|
// The request is not using any of the
|
||||||
//known argument classes (it uses one random request class)
|
// known argument classes (it uses one random request class)
|
||||||
//(known argument classes are listed in
|
// (known argument classes are listed in
|
||||||
//HRegionServer.QosFunctionImpl.knownArgumentClasses)
|
// HRegionServer.QosFunctionImpl.knownArgumentClasses)
|
||||||
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
|
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
|
||||||
headerBuilder.setMethodName("foo");
|
headerBuilder.setMethodName("foo");
|
||||||
RequestHeader header = headerBuilder.build();
|
RequestHeader header = headerBuilder.build();
|
||||||
PriorityFunction qosFunc = RS.rpcServices.getPriority();
|
RSRpcServices mockRpc = mock(RSRpcServices.class);
|
||||||
|
when(mockRpc.getConfiguration()).thenReturn(CONF);
|
||||||
|
|
||||||
|
RSAnnotationReadingPriorityFunction qosFunc = new RSAnnotationReadingPriorityFunction(mockRpc);
|
||||||
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, createSomeUser()));
|
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, createSomeUser()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,55 +116,44 @@ public class TestPriorityRpc {
|
||||||
headerBuilder.setMethodName("Scan");
|
headerBuilder.setMethodName("Scan");
|
||||||
RequestHeader header = headerBuilder.build();
|
RequestHeader header = headerBuilder.build();
|
||||||
|
|
||||||
//build an empty scan request
|
// build an empty scan request
|
||||||
ScanRequest.Builder scanBuilder = ScanRequest.newBuilder();
|
ScanRequest.Builder scanBuilder = ScanRequest.newBuilder();
|
||||||
ScanRequest scanRequest = scanBuilder.build();
|
ScanRequest scanRequest = scanBuilder.build();
|
||||||
HRegion mockRegion = Mockito.mock(HRegion.class);
|
HRegion mockRegion = mock(HRegion.class);
|
||||||
HRegionServer mockRS = Mockito.mock(HRegionServer.class);
|
RSRpcServices mockRpc = mock(RSRpcServices.class);
|
||||||
RSRpcServices mockRpc = Mockito.mock(RSRpcServices.class);
|
when(mockRpc.getConfiguration()).thenReturn(CONF);
|
||||||
Mockito.when(mockRS.getRSRpcServices()).thenReturn(mockRpc);
|
RegionInfo mockRegionInfo = mock(RegionInfo.class);
|
||||||
RegionInfo mockRegionInfo = Mockito.mock(RegionInfo.class);
|
when(mockRpc.getRegion(any())).thenReturn(mockRegion);
|
||||||
Mockito.when(mockRpc.getRegion(Mockito.any())).thenReturn(mockRegion);
|
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||||
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
|
||||||
// make isSystemTable return false
|
// make isSystemTable return false
|
||||||
Mockito.when(mockRegionInfo.getTable())
|
when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
|
||||||
.thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
|
|
||||||
// Presume type.
|
RSAnnotationReadingPriorityFunction qosFunc = new RSAnnotationReadingPriorityFunction(mockRpc);
|
||||||
((AnnotationReadingPriorityFunction)PRIORITY).setRegionServer(mockRS);
|
final int qos = qosFunc.getPriority(header, scanRequest, createSomeUser());
|
||||||
final int qos = PRIORITY.getPriority(header, scanRequest, createSomeUser());
|
|
||||||
assertEquals(Integer.toString(qos), qos, HConstants.NORMAL_QOS);
|
assertEquals(Integer.toString(qos), qos, HConstants.NORMAL_QOS);
|
||||||
|
|
||||||
//build a scan request with scannerID
|
// build a scan request with scannerID
|
||||||
scanBuilder = ScanRequest.newBuilder();
|
scanBuilder = ScanRequest.newBuilder();
|
||||||
scanBuilder.setScannerId(12345);
|
scanBuilder.setScannerId(12345);
|
||||||
scanRequest = scanBuilder.build();
|
scanRequest = scanBuilder.build();
|
||||||
//mock out a high priority type handling and see the QoS returned
|
// mock out a high priority type handling and see the QoS returned
|
||||||
RegionScanner mockRegionScanner = Mockito.mock(RegionScanner.class);
|
RegionScanner mockRegionScanner = mock(RegionScanner.class);
|
||||||
Mockito.when(mockRpc.getScanner(12345)).thenReturn(mockRegionScanner);
|
when(mockRpc.getScanner(12345)).thenReturn(mockRegionScanner);
|
||||||
Mockito.when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo);
|
when(mockRegionScanner.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||||
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
|
when(mockRpc.getRegion((RegionSpecifier) any())).thenReturn(mockRegion);
|
||||||
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
|
||||||
Mockito.when(mockRegionInfo.getTable())
|
when(mockRegionInfo.getTable()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
|
||||||
.thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO.getTable());
|
|
||||||
|
|
||||||
// Presume type.
|
assertEquals(HConstants.SYSTEMTABLE_QOS,
|
||||||
((AnnotationReadingPriorityFunction)PRIORITY).setRegionServer(mockRS);
|
qosFunc.getPriority(header, scanRequest, createSomeUser()));
|
||||||
|
|
||||||
assertEquals(
|
// the same as above but with non-meta region
|
||||||
HConstants.SYSTEMTABLE_QOS,
|
|
||||||
PRIORITY.getPriority(header, scanRequest, createSomeUser()));
|
|
||||||
|
|
||||||
//the same as above but with non-meta region
|
|
||||||
// make isSystemTable return false
|
// make isSystemTable return false
|
||||||
Mockito.when(mockRegionInfo.getTable())
|
when(mockRegionInfo.getTable()).thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
|
||||||
.thenReturn(TableName.valueOf("testQosFunctionForScanMethod"));
|
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, scanRequest, createSomeUser()));
|
||||||
assertEquals(
|
|
||||||
HConstants.NORMAL_QOS,
|
|
||||||
PRIORITY.getPriority(header, scanRequest, createSomeUser()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static User createSomeUser() {
|
private static User createSomeUser() {
|
||||||
return User.createUserForTesting(UTIL.getConfiguration(), "someuser",
|
return User.createUserForTesting(CONF, "someuser", new String[] { "somegroup" });
|
||||||
new String[] { "somegroup" });
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.QosTestHelper;
|
import org.apache.hadoop.hbase.ipc.QosTestBase;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -39,21 +39,21 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiReque
|
||||||
* over in pb doesn't break it.
|
* over in pb doesn't break it.
|
||||||
*/
|
*/
|
||||||
@Category({RegionServerTests.class, MediumTests.class})
|
@Category({RegionServerTests.class, MediumTests.class})
|
||||||
public class TestQosFunction extends QosTestHelper {
|
public class TestRSQosFunction extends QosTestBase {
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestQosFunction.class);
|
HBaseClassTestRule.forClass(TestRSQosFunction.class);
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private RSRpcServices rpcServices;
|
private RSRpcServices rpcServices;
|
||||||
private AnnotationReadingPriorityFunction qosFunction;
|
private RSAnnotationReadingPriorityFunction qosFunction;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
conf = HBaseConfiguration.create();
|
conf = HBaseConfiguration.create();
|
||||||
rpcServices = Mockito.mock(RSRpcServices.class);
|
rpcServices = Mockito.mock(RSRpcServices.class);
|
||||||
when(rpcServices.getConfiguration()).thenReturn(conf);
|
when(rpcServices.getConfiguration()).thenReturn(conf);
|
||||||
qosFunction = new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
|
qosFunction = new RSAnnotationReadingPriorityFunction(rpcServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
Loading…
Reference in New Issue