HBASE-13917 Remove string comparison to identify request priority

This commit is contained in:
Matteo Bertozzi 2015-06-18 09:30:39 -07:00
parent a427f4c859
commit d628f95307
3 changed files with 16 additions and 11 deletions

View File

@ -33,6 +33,9 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated
.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
@ -173,8 +176,7 @@ public class RWQueueRpcExecutor extends RpcExecutor {
private boolean isWriteRequest(final RequestHeader header, final Message param) { private boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this? // TODO: Is there a better way to do this?
String methodName = header.getMethodName(); if (param instanceof MultiRequest) {
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) {
MultiRequest multi = (MultiRequest)param; MultiRequest multi = (MultiRequest)param;
for (RegionAction regionAction : multi.getRegionActionList()) { for (RegionAction regionAction : multi.getRegionActionList()) {
for (Action action: regionAction.getActionList()) { for (Action action: regionAction.getActionList()) {
@ -184,18 +186,17 @@ public class RWQueueRpcExecutor extends RpcExecutor {
} }
} }
} }
if (methodName.equalsIgnoreCase("mutate")) { if (param instanceof MutateRequest) {
return true; return true;
} }
if (methodName.equalsIgnoreCase("ReportRegionStateTransition")) { if (param instanceof ReportRegionStateTransitionRequest) {
return true; return true;
} }
return false; return false;
} }
private boolean isScanRequest(final RequestHeader header, final Message param) { private boolean isScanRequest(final RequestHeader header, final Message param) {
String methodName = header.getMethodName(); if (param instanceof ScanRequest) {
if (methodName.equalsIgnoreCase("scan")) {
// The first scan request will be executed as a "short read" // The first scan request will be executed as a "short read"
ScanRequest request = (ScanRequest)param; ScanRequest request = (ScanRequest)param;
return request.hasScannerId(); return request.hasScannerId();

View File

@ -198,7 +198,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
if (param == null) { if (param == null) {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
} }
if (methodName.equalsIgnoreCase("multi") && param instanceof MultiRequest) { if (param instanceof MultiRequest) {
// The multi call has its priority set in the header. All calls should work this way but // The multi call has its priority set in the header. All calls should work this way but
// only this one has been converted so far. No priority == NORMAL_QOS. // only this one has been converted so far. No priority == NORMAL_QOS.
return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS; return header.hasPriority()? header.getPriority(): HConstants.NORMAL_QOS;
@ -232,7 +232,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
} }
if (methodName.equalsIgnoreCase("scan")) { // scanner methods... if (param instanceof ScanRequest) { // scanner methods...
ScanRequest request = (ScanRequest)param; ScanRequest request = (ScanRequest)param;
if (!request.hasScannerId()) { if (!request.hasScannerId()) {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
@ -249,7 +249,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
// If meta is moving then all the rest of report the report state transitions will be // If meta is moving then all the rest of report the report state transitions will be
// blocked. We shouldn't be in the same queue. // blocked. We shouldn't be in the same queue.
if (methodName.equalsIgnoreCase("ReportRegionStateTransition")) { // Regions are moving if (param instanceof ReportRegionStateTransitionRequest) { // Regions are moving
ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param; ReportRegionStateTransitionRequest tRequest = (ReportRegionStateTransitionRequest) param;
for (RegionStateTransition transition : tRequest.getTransitionList()) { for (RegionStateTransition transition : tRequest.getTransitionList()) {
if (transition.getRegionInfoList() != null) { if (transition.getRegionInfoList() != null) {
@ -274,8 +274,7 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
*/ */
@Override @Override
public long getDeadline(RequestHeader header, Message param) { public long getDeadline(RequestHeader header, Message param) {
String methodName = header.getMethodName(); if (param instanceof ScanRequest) {
if (methodName.equalsIgnoreCase("scan")) {
ScanRequest request = (ScanRequest)param; ScanRequest request = (ScanRequest)param;
if (!request.hasScannerId()) { if (!request.hasScannerId()) {
return 0; return 0;

View File

@ -27,12 +27,15 @@ import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -247,6 +250,8 @@ public class TestSimpleRpcScheduler {
CallRunner putCallTask = mock(CallRunner.class); CallRunner putCallTask = mock(CallRunner.class);
RpcServer.Call putCall = mock(RpcServer.Call.class); RpcServer.Call putCall = mock(RpcServer.Call.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build(); RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getCall()).thenReturn(putCall); when(putCallTask.getCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead); when(putCall.getHeader()).thenReturn(putHead);