diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java index e323e78bbad..f56bf6fdcd5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.security.User; /** * Function to figure priority of incoming request. @@ -34,9 +35,10 @@ public interface PriorityFunction { * The returned value is mainly used to select the dispatch queue. * @param header * @param param + * @param user * @return Priority of this request. */ - int getPriority(RequestHeader header, Message param); + int getPriority(RequestHeader header, Message param, User user); /** * Returns the deadline of the specified request. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 2889d8d9b88..b8e9c52189c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -192,7 +192,7 @@ public class SimpleRpcScheduler extends RpcScheduler { @Override public void dispatch(CallRunner callTask) throws InterruptedException { RpcServer.Call call = callTask.getCall(); - int level = priority.getPriority(call.getHeader(), call.param); + int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser()); if (priorityExecutor != null && level > highPriorityLevel) { priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java index 880710a0b66..c96619bd9c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.reflect.Method; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; @@ -49,7 +52,9 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Message; import com.google.protobuf.TextFormat; - +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.visibility.VisibilityUtils; +import org.apache.hadoop.hbase.util.Pair; /** * Reads special method annotations and table names to figure a priority for use by QoS facility in @@ -105,6 +110,11 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { private final float scanVirtualTimeWeight; + // lists of super users and super groups, used to route rpc calls made by + // superusers through high-priority (ADMIN_QOS) thread pool. + // made protected for tests + protected final HashSet superUsers; + protected final HashSet superGroups; /** * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of * {@code rpcServices#getClass()} @@ -158,6 +168,16 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { Configuration conf = rpcServices.getConfiguration(); scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); + + try { + // TODO Usage of VisibilityUtils API to be avoided with HBASE-13755 + Pair, List> pair = VisibilityUtils.getSystemAndSuperUsers(rpcServices + .getConfiguration()); + superUsers = new HashSet<>(pair.getFirst()); + superGroups = new HashSet<>(pair.getSecond()); + } catch (IOException e) { + throw new RuntimeException(e); + } } private String capitalize(final String s) { @@ -175,12 +195,18 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { * NORMAL_QOS (user requests). */ @Override - public int getPriority(RequestHeader header, Message param) { + public int getPriority(RequestHeader header, Message param, User user) { String methodName = header.getMethodName(); Integer priorityByAnnotation = annotatedQos.get(methodName); if (priorityByAnnotation != null) { return priorityByAnnotation; } + + // all requests executed by super users have high QoS + if (isExecutedBySuperUser(user)) { + return HConstants.ADMIN_QOS; + } + if (param == null) { return HConstants.NORMAL_QOS; } @@ -280,4 +306,24 @@ class AnnotationReadingPriorityFunction implements PriorityFunction { void setRegionServer(final HRegionServer hrs) { this.rpcServices = hrs.getRSRpcServices(); } + + /** + * @param user user running request + * @return true if user is super user, false otherwise + */ + private boolean isExecutedBySuperUser(User user) { + if (superUsers.contains(user.getShortName())) { + return true; + } + + String[] groups = user.getGroupNames(); + if (groups != null) { + for (String group : groups) { + if (superGroups.contains(group)) { + return true; + } + } + } + return false; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5a740fe89f4..7184b027ab0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -1010,8 +1011,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override - public int getPriority(RequestHeader header, Message param) { - return priority.getPriority(header, param); + public int getPriority(RequestHeader header, Message param, User user) { + return priority.getPriority(header, param, user); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 0f684a5b4d3..3bdffee8e2e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -27,6 +27,7 @@ import com.google.protobuf.Message; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; @@ -124,7 +125,8 @@ public class TestSimpleRpcScheduler { scheduler.init(CONTEXT); scheduler.start(); for (CallRunner task : tasks) { - when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject())) + when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), + (Message) anyObject(), (User) anyObject())) .thenReturn(qos.get(task)); scheduler.dispatch(task); } @@ -156,7 +158,8 @@ public class TestSimpleRpcScheduler { schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) + when(priority.getPriority(any(RequestHeader.class), + any(Message.class), any(User.class))) .thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, @@ -234,8 +237,8 @@ public class TestSimpleRpcScheduler { schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); PriorityFunction priority = mock(PriorityFunction.class); - when(priority.getPriority(any(RequestHeader.class), any(Message.class))) - .thenReturn(HConstants.NORMAL_QOS); + when(priority.getPriority(any(RequestHeader.class), any(Message.class), + any(User.class))).thenReturn(HConstants.NORMAL_QOS); RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, HConstants.QOS_THRESHOLD); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java index ae7a2f6f20e..ae55daf22f2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterPriorityRpc.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.Before; import org.junit.Test; @@ -42,6 +43,7 @@ import com.google.common.collect.Sets; public class TestMasterPriorityRpc { private HMaster master = null; private PriorityFunction priority = null; + private User user = null; private final Set ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId", "RegionServerReport", "RegionServerStartup", "ReportRSFatalError", @@ -57,6 +59,7 @@ public class TestMasterPriorityRpc { CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); master = HMaster.constructMaster(HMaster.class, conf, cp); priority = master.getMasterRpcServices().getPriority(); + user = User.createUserForTesting(conf, "someuser", new String[]{"somegroup"}); } /** @@ -68,8 +71,8 @@ public class TestMasterPriorityRpc { * The expected priority. */ private void assertPriority(String methodName, int expectedPriority) { - assertEquals(methodName + " had unexpected priority", expectedPriority, - priority.getPriority(RequestHeader.newBuilder().setMethodName(methodName).build(), null)); + assertEquals(methodName + " had unexpected priority", expectedPriority, priority.getPriority( + RequestHeader.newBuilder().setMethodName(methodName).build(), null, user)); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java index 6e1c360146f..c82d2a7bc4a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; @@ -89,12 +90,13 @@ public class TestPriorityRpc { RSRpcServices mockRpc = Mockito.mock(RSRpcServices.class); Mockito.when(mockRS.getRSRpcServices()).thenReturn(mockRpc); HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class); - Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); + Mockito.when(mockRpc.getRegion((RegionSpecifier) Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); } @Test @@ -107,7 +109,28 @@ public class TestPriorityRpc { headerBuilder.setMethodName("foo"); RequestHeader header = headerBuilder.build(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); - assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null)); + assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); + } + + @Test + public void testQosFunctionForRequestCalledBySuperUser() throws Exception { + RequestHeader.Builder headerBuilder = RequestHeader.newBuilder(); + headerBuilder.setMethodName("foo"); + RequestHeader header = headerBuilder.build(); + PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); + + //test superusers + ((AnnotationReadingPriorityFunction) qosFunc).superUsers.add("samplesuperuser"); + assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null, + User.createUserForTesting(regionServer.conf, "samplesuperuser", + new String[]{"somegroup"}))); + + //test supergroups + ((AnnotationReadingPriorityFunction) qosFunc).superGroups.add("samplesupergroup"); + assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null, + User.createUserForTesting(regionServer.conf, "regularuser", + new String[]{"samplesupergroup"}))); } @Test @@ -129,7 +152,8 @@ public class TestPriorityRpc { Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - int qos = priority.getPriority(header, scanRequest); + int qos = priority.getPriority(header, scanRequest, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})); assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); //build a scan request with scannerID @@ -147,10 +171,12 @@ public class TestPriorityRpc { // Presume type. ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); - assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); //the same as above but with non-meta region Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); - assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest)); + assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest, + User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java index 4d5d7ef80f3..79ca11b2757 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; @@ -54,11 +55,12 @@ public class TestQosFunction { new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class); // Set method name in pb style with the method name capitalized. - checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); + checkMethod(conf, "ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); // Set method name in pb style with the method name capitalized. - checkMethod("OpenRegion", HConstants.ADMIN_QOS, qosFunction); + checkMethod(conf, "OpenRegion", HConstants.ADMIN_QOS, qosFunction); // Check multi works. - checkMethod("Multi", HConstants.NORMAL_QOS, qosFunction, MultiRequest.getDefaultInstance()); + checkMethod(conf, "Multi", HConstants.NORMAL_QOS, qosFunction, + MultiRequest.getDefaultInstance()); } @@ -102,19 +104,21 @@ public class TestQosFunction { .addTransition(normalTransition).build(); final String reportFuncName = "ReportRegionStateTransition"; - checkMethod(reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction, metaTransitionRequest); - checkMethod(reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest); + checkMethod(conf, reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction, + metaTransitionRequest); + checkMethod(conf, reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest); } - private void checkMethod(final String methodName, final int expected, + private void checkMethod(Configuration conf, final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf) { - checkMethod(methodName, expected, qosf, null); + checkMethod(conf, methodName, expected, qosf, null); } - private void checkMethod(final String methodName, final int expected, + private void checkMethod(Configuration conf, final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf, final Message param) { RequestHeader.Builder builder = RequestHeader.newBuilder(); builder.setMethodName(methodName); - assertEquals(methodName, expected, qosf.getPriority(builder.build(), param)); + assertEquals(methodName, expected, qosf.getPriority(builder.build(), param, + User.createUserForTesting(conf, "someuser", new String[]{"somegroup"}))); } } \ No newline at end of file