HBASE-13375 Provide HBase superuser higher priority over other users in the RPC handling. (Mikhail Antonov)

This commit is contained in:
anoopsjohn 2015-05-24 21:28:50 +05:30
parent ee5263db8c
commit 2542a38e21
8 changed files with 112 additions and 27 deletions

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.security.User;
/** /**
* Function to figure priority of incoming request. * Function to figure priority of incoming request.
@ -34,9 +35,10 @@ public interface PriorityFunction {
* The returned value is mainly used to select the dispatch queue. * The returned value is mainly used to select the dispatch queue.
* @param header * @param header
* @param param * @param param
* @param user
* @return Priority of this request. * @return Priority of this request.
*/ */
int getPriority(RequestHeader header, Message param); int getPriority(RequestHeader header, Message param, User user);
/** /**
* Returns the deadline of the specified request. * Returns the deadline of the specified request.

View File

@ -192,7 +192,7 @@ public class SimpleRpcScheduler extends RpcScheduler {
@Override @Override
public void dispatch(CallRunner callTask) throws InterruptedException { public void dispatch(CallRunner callTask) throws InterruptedException {
RpcServer.Call call = callTask.getCall(); RpcServer.Call call = callTask.getCall();
int level = priority.getPriority(call.getHeader(), call.param); int level = priority.getPriority(call.getHeader(), call.param, call.getRequestUser());
if (priorityExecutor != null && level > highPriorityLevel) { if (priorityExecutor != null && level > highPriorityLevel) {
priorityExecutor.dispatch(callTask); priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {

View File

@ -17,8 +17,11 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -49,7 +52,9 @@ import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.visibility.VisibilityUtils;
import org.apache.hadoop.hbase.util.Pair;
/** /**
* Reads special method annotations and table names to figure a priority for use by QoS facility in * Reads special method annotations and table names to figure a priority for use by QoS facility in
@ -105,6 +110,11 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
private final float scanVirtualTimeWeight; private final float scanVirtualTimeWeight;
// lists of super users and super groups, used to route rpc calls made by
// superusers through high-priority (ADMIN_QOS) thread pool.
// made protected for tests
protected final HashSet<String> superUsers;
protected final HashSet<String> superGroups;
/** /**
* Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of * Calls {@link #AnnotationReadingPriorityFunction(RSRpcServices, Class)} using the result of
* {@code rpcServices#getClass()} * {@code rpcServices#getClass()}
@ -158,6 +168,16 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
Configuration conf = rpcServices.getConfiguration(); Configuration conf = rpcServices.getConfiguration();
scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f); scanVirtualTimeWeight = conf.getFloat(SCAN_VTIME_WEIGHT_CONF_KEY, 1.0f);
try {
// TODO Usage of VisibilityUtils API to be avoided with HBASE-13755
Pair<List<String>, List<String>> pair = VisibilityUtils.getSystemAndSuperUsers(rpcServices
.getConfiguration());
superUsers = new HashSet<>(pair.getFirst());
superGroups = new HashSet<>(pair.getSecond());
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
private String capitalize(final String s) { private String capitalize(final String s) {
@ -175,12 +195,18 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
* NORMAL_QOS (user requests). * NORMAL_QOS (user requests).
*/ */
@Override @Override
public int getPriority(RequestHeader header, Message param) { public int getPriority(RequestHeader header, Message param, User user) {
String methodName = header.getMethodName(); String methodName = header.getMethodName();
Integer priorityByAnnotation = annotatedQos.get(methodName); Integer priorityByAnnotation = annotatedQos.get(methodName);
if (priorityByAnnotation != null) { if (priorityByAnnotation != null) {
return priorityByAnnotation; return priorityByAnnotation;
} }
// all requests executed by super users have high QoS
if (isExecutedBySuperUser(user)) {
return HConstants.ADMIN_QOS;
}
if (param == null) { if (param == null) {
return HConstants.NORMAL_QOS; return HConstants.NORMAL_QOS;
} }
@ -280,4 +306,24 @@ class AnnotationReadingPriorityFunction implements PriorityFunction {
void setRegionServer(final HRegionServer hrs) { void setRegionServer(final HRegionServer hrs) {
this.rpcServices = hrs.getRSRpcServices(); this.rpcServices = hrs.getRSRpcServices();
} }
/**
* @param user user running request
* @return true if user is super user, false otherwise
*/
private boolean isExecutedBySuperUser(User user) {
if (superUsers.contains(user.getShortName())) {
return true;
}
String[] groups = user.getGroupNames();
if (groups != null) {
for (String group : groups) {
if (superGroups.contains(group)) {
return true;
}
}
}
return false;
}
} }

View File

@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -1010,8 +1011,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
@Override @Override
public int getPriority(RequestHeader header, Message param) { public int getPriority(RequestHeader header, Message param, User user) {
return priority.getPriority(header, param); return priority.getPriority(header, param, user);
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import com.google.protobuf.Message;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.ipc.RpcServer.Call; import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
@ -124,7 +125,8 @@ public class TestSimpleRpcScheduler {
scheduler.init(CONTEXT); scheduler.init(CONTEXT);
scheduler.start(); scheduler.start();
for (CallRunner task : tasks) { for (CallRunner task : tasks) {
when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject())) when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(),
(Message) anyObject(), (User) anyObject()))
.thenReturn(qos.get(task)); .thenReturn(qos.get(task));
scheduler.dispatch(task); scheduler.dispatch(task);
} }
@ -156,7 +158,8 @@ public class TestSimpleRpcScheduler {
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, queueType);
PriorityFunction priority = mock(PriorityFunction.class); PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class))) when(priority.getPriority(any(RequestHeader.class),
any(Message.class), any(User.class)))
.thenReturn(HConstants.NORMAL_QOS); .thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority, RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 1, 1, 1, priority,
@ -234,8 +237,8 @@ public class TestSimpleRpcScheduler {
schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); schedConf.setFloat(SimpleRpcScheduler.CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);
PriorityFunction priority = mock(PriorityFunction.class); PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(RequestHeader.class), any(Message.class))) when(priority.getPriority(any(RequestHeader.class), any(Message.class),
.thenReturn(HConstants.NORMAL_QOS); any(User.class))).thenReturn(HConstants.NORMAL_QOS);
RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority, RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 1, 1, priority,
HConstants.QOS_THRESHOLD); HConstants.QOS_THRESHOLD);

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -42,6 +43,7 @@ import com.google.common.collect.Sets;
public class TestMasterPriorityRpc { public class TestMasterPriorityRpc {
private HMaster master = null; private HMaster master = null;
private PriorityFunction priority = null; private PriorityFunction priority = null;
private User user = null;
private final Set<String> ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId", private final Set<String> ADMIN_METHODS = Sets.newHashSet("GetLastFlushedSequenceId",
"RegionServerReport", "RegionServerStartup", "ReportRSFatalError", "RegionServerReport", "RegionServerStartup", "ReportRSFatalError",
@ -57,6 +59,7 @@ public class TestMasterPriorityRpc {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
master = HMaster.constructMaster(HMaster.class, conf, cp); master = HMaster.constructMaster(HMaster.class, conf, cp);
priority = master.getMasterRpcServices().getPriority(); priority = master.getMasterRpcServices().getPriority();
user = User.createUserForTesting(conf, "someuser", new String[]{"somegroup"});
} }
/** /**
@ -68,8 +71,8 @@ public class TestMasterPriorityRpc {
* The expected priority. * The expected priority.
*/ */
private void assertPriority(String methodName, int expectedPriority) { private void assertPriority(String methodName, int expectedPriority) {
assertEquals(methodName + " had unexpected priority", expectedPriority, assertEquals(methodName + " had unexpected priority", expectedPriority, priority.getPriority(
priority.getPriority(RequestHeader.newBuilder().setMethodName(methodName).build(), null)); RequestHeader.newBuilder().setMethodName(methodName).build(), null, user));
} }
@Test @Test

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
@ -89,12 +90,13 @@ public class TestPriorityRpc {
RSRpcServices mockRpc = Mockito.mock(RSRpcServices.class); RSRpcServices mockRpc = Mockito.mock(RSRpcServices.class);
Mockito.when(mockRS.getRSRpcServices()).thenReturn(mockRpc); Mockito.when(mockRS.getRSRpcServices()).thenReturn(mockRpc);
HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class); HRegionInfo mockRegionInfo = Mockito.mock(HRegionInfo.class);
Mockito.when(mockRpc.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion); Mockito.when(mockRpc.getRegion((RegionSpecifier) Mockito.any())).thenReturn(mockRegion);
Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo); Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true); Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(true);
// Presume type. // Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest)); assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, getRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
} }
@Test @Test
@ -107,7 +109,28 @@ public class TestPriorityRpc {
headerBuilder.setMethodName("foo"); headerBuilder.setMethodName("foo");
RequestHeader header = headerBuilder.build(); RequestHeader header = headerBuilder.build();
PriorityFunction qosFunc = regionServer.rpcServices.getPriority(); PriorityFunction qosFunc = regionServer.rpcServices.getPriority();
assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null)); assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
}
@Test
public void testQosFunctionForRequestCalledBySuperUser() throws Exception {
RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
headerBuilder.setMethodName("foo");
RequestHeader header = headerBuilder.build();
PriorityFunction qosFunc = regionServer.rpcServices.getPriority();
//test superusers
((AnnotationReadingPriorityFunction) qosFunc).superUsers.add("samplesuperuser");
assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null,
User.createUserForTesting(regionServer.conf, "samplesuperuser",
new String[]{"somegroup"})));
//test supergroups
((AnnotationReadingPriorityFunction) qosFunc).superGroups.add("samplesupergroup");
assertEquals(HConstants.ADMIN_QOS, qosFunc.getPriority(header, null,
User.createUserForTesting(regionServer.conf, "regularuser",
new String[]{"samplesupergroup"})));
} }
@Test @Test
@ -129,7 +152,8 @@ public class TestPriorityRpc {
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false);
// Presume type. // Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
int qos = priority.getPriority(header, scanRequest); int qos = priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"}));
assertTrue ("" + qos, qos == HConstants.NORMAL_QOS); assertTrue ("" + qos, qos == HConstants.NORMAL_QOS);
//build a scan request with scannerID //build a scan request with scannerID
@ -147,10 +171,12 @@ public class TestPriorityRpc {
// Presume type. // Presume type.
((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS); ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest)); assertEquals(HConstants.SYSTEMTABLE_QOS, priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
//the same as above but with non-meta region //the same as above but with non-meta region
Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false); Mockito.when(mockRegionInfo.isSystemTable()).thenReturn(false);
assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest)); assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest,
User.createUserForTesting(regionServer.conf, "someuser", new String[]{"somegroup"})));
} }
} }

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
@ -54,11 +55,12 @@ public class TestQosFunction {
new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class); new AnnotationReadingPriorityFunction(rpcServices, RSRpcServices.class);
// Set method name in pb style with the method name capitalized. // Set method name in pb style with the method name capitalized.
checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction); checkMethod(conf, "ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
// Set method name in pb style with the method name capitalized. // Set method name in pb style with the method name capitalized.
checkMethod("OpenRegion", HConstants.ADMIN_QOS, qosFunction); checkMethod(conf, "OpenRegion", HConstants.ADMIN_QOS, qosFunction);
// Check multi works. // Check multi works.
checkMethod("Multi", HConstants.NORMAL_QOS, qosFunction, MultiRequest.getDefaultInstance()); checkMethod(conf, "Multi", HConstants.NORMAL_QOS, qosFunction,
MultiRequest.getDefaultInstance());
} }
@ -102,19 +104,21 @@ public class TestQosFunction {
.addTransition(normalTransition).build(); .addTransition(normalTransition).build();
final String reportFuncName = "ReportRegionStateTransition"; final String reportFuncName = "ReportRegionStateTransition";
checkMethod(reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction, metaTransitionRequest); checkMethod(conf, reportFuncName, HConstants.SYSTEMTABLE_QOS, qosFunction,
checkMethod(reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest); metaTransitionRequest);
checkMethod(conf, reportFuncName, HConstants.NORMAL_QOS, qosFunction, normalTransitionRequest);
} }
private void checkMethod(final String methodName, final int expected, private void checkMethod(Configuration conf, final String methodName, final int expected,
final AnnotationReadingPriorityFunction qosf) { final AnnotationReadingPriorityFunction qosf) {
checkMethod(methodName, expected, qosf, null); checkMethod(conf, methodName, expected, qosf, null);
} }
private void checkMethod(final String methodName, final int expected, private void checkMethod(Configuration conf, final String methodName, final int expected,
final AnnotationReadingPriorityFunction qosf, final Message param) { final AnnotationReadingPriorityFunction qosf, final Message param) {
RequestHeader.Builder builder = RequestHeader.newBuilder(); RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setMethodName(methodName); builder.setMethodName(methodName);
assertEquals(methodName, expected, qosf.getPriority(builder.build(), param)); assertEquals(methodName, expected, qosf.getPriority(builder.build(), param,
User.createUserForTesting(conf, "someuser", new String[]{"somegroup"})));
} }
} }