HBASE-13917 Remove string comparison to identify request priority
This commit is contained in:
parent
51b606cd18
commit
fedfe878f4
|
@ -33,6 +33,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated
|
||||||
|
.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
@ -173,8 +176,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
|
|
||||||
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
||||||
// TODO: Is there a better way to do this?
|
// TODO: Is there a better way to do this?
|
||||||
String methodName = header.getMethodName();
|
if (param instanceof MultiRequest) {
|
||||||
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
|
|
||||||
MultiRequest multi = (MultiRequest)param;
|
MultiRequest multi = (MultiRequest)param;
|
||||||
for (RegionAction regionAction : multi.getRegionActionList()) {
|
for (RegionAction regionAction : multi.getRegionActionList()) {
|
||||||
for (Action action: regionAction.getActionList()) {
|
for (Action action: regionAction.getActionList()) {
|
||||||
|
@ -184,18 +186,17 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (methodName.equalsIgnoreCase("mutate")) {
|
if (param instanceof MutateRequest) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (methodName.equalsIgnoreCase("ReportRegionStateTransition")) {
|
if (param instanceof ReportRegionStateTransitionRequest) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isScanRequest(final RequestHeader header, final Message param) {
|
private boolean isScanRequest(final RequestHeader header, final Message param) {
|
||||||
String methodName = header.getMethodName();
|
if (param instanceof ScanRequest) {
|
||||||
if (methodName.equalsIgnoreCase("scan")) {
|
|
||||||
// The first scan request will be executed as a "short read"
|
// The first scan request will be executed as a "short read"
|
||||||
ScanRequest request = (ScanRequest)param;
|
ScanRequest request = (ScanRequest)param;
|
||||||
return request.hasScannerId();
|
return request.hasScannerId();
|
||||||
|
|
|
@ -198,7 +198,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
if (param == null) {
|
if (param == null) {
|
||||||
return HConstants.NORMAL_QOS;
|
return HConstants.NORMAL_QOS;
|
||||||
}
|
}
|
||||||
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
|
if (param instanceof MultiRequest) {
|
||||||
// The multi call has its priority set in the header. All calls should work this way but
|
// The multi call has its priority set in the header. All calls should work this way but
|
||||||
// only this one has been converted so far. No priority == NORMAL_QOS.
|
// only this one has been converted so far. No priority == NORMAL_QOS.
|
||||||
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
|
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
|
||||||
|
@ -232,7 +232,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
return HConstants.NORMAL_QOS;
|
return HConstants.NORMAL_QOS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (methodName.equalsIgnoreCase("scan")) { // scanner methods...
|
if (param instanceof ScanRequest) { // scanner methods...
|
||||||
ScanRequest request = (ScanRequest)param;
|
ScanRequest request = (ScanRequest)param;
|
||||||
if (!request.hasScannerId()) {
|
if (!request.hasScannerId()) {
|
||||||
return HConstants.NORMAL_QOS;
|
return HConstants.NORMAL_QOS;
|
||||||
|
@ -249,7 +249,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
|
|
||||||
// If meta is moving then all the rest of report the report state transitions will be
|
// If meta is moving then all the rest of report the report state transitions will be
|
||||||
// blocked. We shouldn't be in the same queue.
|
// blocked. We shouldn't be in the same queue.
|
||||||
if (methodName.equalsIgnoreCase("ReportRegionStateTransition")) { // Regions are moving
|
if (param instanceof ReportRegionStateTransitionRequest) { // Regions are moving
|
||||||
ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
|
ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
|
||||||
for (RegionStateTransition transition : tRequest.getTransitionList()) {
|
for (RegionStateTransition transition : tRequest.getTransitionList()) {
|
||||||
if (transition.getRegionInfoList() != null) {
|
if (transition.getRegionInfoList() != null) {
|
||||||
|
@ -274,8 +274,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getDeadline(RequestHeader header, Message param) {
|
public long getDeadline(RequestHeader header, Message param) {
|
||||||
String methodName = header.getMethodName();
|
if (param instanceof ScanRequest) {
|
||||||
if (methodName.equalsIgnoreCase("scan")) {
|
|
||||||
ScanRequest request = (ScanRequest)param;
|
ScanRequest request = (ScanRequest)param;
|
||||||
if (!request.hasScannerId()) {
|
if (!request.hasScannerId()) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -27,13 +27,16 @@ import com.google.protobuf.Message;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -248,6 +251,8 @@ public class TestSimpleRpcScheduler {
|
||||||
|
|
||||||
CallRunner putCallTask = mock(CallRunner.class);
|
CallRunner putCallTask = mock(CallRunner.class);
|
||||||
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
||||||
|
putCall.param = RequestConverter.buildMutateRequest(
|
||||||
|
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
|
||||||
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
|
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||||
when(putCallTask.getCall()).thenReturn(putCall);
|
when(putCallTask.getCall()).thenReturn(putCall);
|
||||||
when(putCall.getHeader()).thenReturn(putHead);
|
when(putCall.getHeader()).thenReturn(putHead);
|
||||||
|
|
Loading…
Reference in New Issue