HBASE-21713 Support set region server throttle quota
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
3e72cb73ae
commit
1e7f306e17
|
@ -33,6 +33,7 @@ public class QuotaFilter {
|
|||
private String namespaceRegex;
|
||||
private String tableRegex;
|
||||
private String userRegex;
|
||||
private String regionServerRegex;
|
||||
|
||||
public QuotaFilter() {
|
||||
}
|
||||
|
@ -70,6 +71,17 @@ public class QuotaFilter {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the region server filter regex
|
||||
* @param regex the region server filter
|
||||
* @return the quota filter object
|
||||
*/
|
||||
public QuotaFilter setRegionServerFilter(final String regex) {
|
||||
this.regionServerRegex = regex;
|
||||
hasFilters |= StringUtils.isNotEmpty(regex);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a type to the filter list
|
||||
* @param type the type to filter on
|
||||
|
@ -105,4 +117,9 @@ public class QuotaFilter {
|
|||
public String getUserFilter() {
|
||||
return userRegex;
|
||||
}
|
||||
|
||||
/** @return the RegionServer filter regex */
|
||||
public String getRegionServerFilter() {
|
||||
return regionServerRegex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,24 +22,26 @@ import java.util.Objects;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory.QuotaGlobalsSettingsBypass;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory.QuotaGlobalsSettingsBypass;
|
||||
|
||||
@InterfaceAudience.Public
|
||||
public abstract class QuotaSettings {
|
||||
private final String userName;
|
||||
private final String namespace;
|
||||
private final TableName tableName;
|
||||
private final String regionServer;
|
||||
|
||||
protected QuotaSettings(final String userName, final TableName tableName,
|
||||
final String namespace) {
|
||||
protected QuotaSettings(final String userName, final TableName tableName, final String namespace,
|
||||
final String regionServer) {
|
||||
this.userName = userName;
|
||||
this.namespace = namespace;
|
||||
this.tableName = tableName;
|
||||
this.regionServer = regionServer;
|
||||
}
|
||||
|
||||
public abstract QuotaType getQuotaType();
|
||||
|
@ -56,6 +58,10 @@ public abstract class QuotaSettings {
|
|||
return namespace;
|
||||
}
|
||||
|
||||
public String getRegionServer() {
|
||||
return regionServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts the protocol buffer request into a QuotaSetting POJO. Arbitrarily
|
||||
* enforces that the request only contain one "limit", despite the message
|
||||
|
@ -78,6 +84,10 @@ public abstract class QuotaSettings {
|
|||
if (request.hasNamespace()) {
|
||||
namespace = request.getNamespace();
|
||||
}
|
||||
String regionServer = null;
|
||||
if (request.hasRegionServer()) {
|
||||
regionServer = request.getRegionServer();
|
||||
}
|
||||
if (request.hasBypassGlobals()) {
|
||||
// Make sure we don't have either of the two below limits also included
|
||||
if (request.hasSpaceLimit() || request.hasThrottle()) {
|
||||
|
@ -85,7 +95,7 @@ public abstract class QuotaSettings {
|
|||
"SetQuotaRequest has multiple limits: " + TextFormat.shortDebugString(request));
|
||||
}
|
||||
return new QuotaGlobalsSettingsBypass(
|
||||
username, tableName, namespace, request.getBypassGlobals());
|
||||
username, tableName, namespace, regionServer, request.getBypassGlobals());
|
||||
} else if (request.hasSpaceLimit()) {
|
||||
// Make sure we don't have the below limit as well
|
||||
if (request.hasThrottle()) {
|
||||
|
@ -100,7 +110,8 @@ public abstract class QuotaSettings {
|
|||
return QuotaSettingsFactory.fromSpace(
|
||||
tableName, namespace, request.getSpaceLimit().getQuota());
|
||||
} else if (request.hasThrottle()) {
|
||||
return new ThrottleSettings(username, tableName, namespace, request.getThrottle());
|
||||
return new ThrottleSettings(username, tableName, namespace, regionServer,
|
||||
request.getThrottle());
|
||||
} else {
|
||||
throw new IllegalStateException("Unhandled SetRequestRequest state");
|
||||
}
|
||||
|
@ -123,6 +134,9 @@ public abstract class QuotaSettings {
|
|||
if (settings.getNamespace() != null) {
|
||||
builder.setNamespace(settings.getNamespace());
|
||||
}
|
||||
if (settings.getRegionServer() != null) {
|
||||
builder.setRegionServer(settings.getRegionServer());
|
||||
}
|
||||
settings.setupSetQuotaRequest(builder);
|
||||
return builder.build();
|
||||
}
|
||||
|
@ -152,6 +166,9 @@ public abstract class QuotaSettings {
|
|||
builder.append(namespace);
|
||||
builder.append("', ");
|
||||
}
|
||||
if (regionServer != null) {
|
||||
builder.append("REGIONSERVER => ").append(regionServer).append(", ");
|
||||
}
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
|
@ -203,5 +220,8 @@ public abstract class QuotaSettings {
|
|||
if (!Objects.equals(getNamespace(), mergee.getNamespace())) {
|
||||
throw new IllegalArgumentException("Mismatched namespace on settings to merge");
|
||||
}
|
||||
if (!Objects.equals(getRegionServer(), mergee.getRegionServer())) {
|
||||
throw new IllegalArgumentException("Mismatched region server on settings to merge");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
|
@ -37,8 +37,8 @@ public class QuotaSettingsFactory {
|
|||
private final boolean bypassGlobals;
|
||||
|
||||
QuotaGlobalsSettingsBypass(final String userName, final TableName tableName,
|
||||
final String namespace, final boolean bypassGlobals) {
|
||||
super(userName, tableName, namespace);
|
||||
final String namespace, final String regionServer, final boolean bypassGlobals) {
|
||||
super(userName, tableName, namespace, regionServer);
|
||||
this.bypassGlobals = bypassGlobals;
|
||||
}
|
||||
|
||||
|
@ -80,35 +80,42 @@ public class QuotaSettingsFactory {
|
|||
* QuotaSettings from the Quotas object
|
||||
*/
|
||||
static List<QuotaSettings> fromUserQuotas(final String userName, final Quotas quotas) {
|
||||
return fromQuotas(userName, null, null, quotas);
|
||||
return fromQuotas(userName, null, null, null, quotas);
|
||||
}
|
||||
|
||||
static List<QuotaSettings> fromUserQuotas(final String userName, final TableName tableName,
|
||||
final Quotas quotas) {
|
||||
return fromQuotas(userName, tableName, null, quotas);
|
||||
return fromQuotas(userName, tableName, null, null, quotas);
|
||||
}
|
||||
|
||||
static List<QuotaSettings> fromUserQuotas(final String userName, final String namespace,
|
||||
final Quotas quotas) {
|
||||
return fromQuotas(userName, null, namespace, quotas);
|
||||
return fromQuotas(userName, null, namespace, null, quotas);
|
||||
}
|
||||
|
||||
static List<QuotaSettings> fromTableQuotas(final TableName tableName, final Quotas quotas) {
|
||||
return fromQuotas(null, tableName, null, quotas);
|
||||
return fromQuotas(null, tableName, null, null, quotas);
|
||||
}
|
||||
|
||||
static List<QuotaSettings> fromNamespaceQuotas(final String namespace, final Quotas quotas) {
|
||||
return fromQuotas(null, null, namespace, quotas);
|
||||
return fromQuotas(null, null, namespace, null, quotas);
|
||||
}
|
||||
|
||||
static List<QuotaSettings> fromRegionServerQuotas(final String regionServer,
|
||||
final Quotas quotas) {
|
||||
return fromQuotas(null, null, null, regionServer, quotas);
|
||||
}
|
||||
|
||||
private static List<QuotaSettings> fromQuotas(final String userName, final TableName tableName,
|
||||
final String namespace, final Quotas quotas) {
|
||||
final String namespace, final String regionServer, final Quotas quotas) {
|
||||
List<QuotaSettings> settings = new ArrayList<>();
|
||||
if (quotas.hasThrottle()) {
|
||||
settings.addAll(fromThrottle(userName, tableName, namespace, quotas.getThrottle()));
|
||||
settings
|
||||
.addAll(fromThrottle(userName, tableName, namespace, regionServer, quotas.getThrottle()));
|
||||
}
|
||||
if (quotas.getBypassGlobals() == true) {
|
||||
settings.add(new QuotaGlobalsSettingsBypass(userName, tableName, namespace, true));
|
||||
settings
|
||||
.add(new QuotaGlobalsSettingsBypass(userName, tableName, namespace, regionServer, true));
|
||||
}
|
||||
if (quotas.hasSpace()) {
|
||||
settings.add(fromSpace(tableName, namespace, quotas.getSpace()));
|
||||
|
@ -116,43 +123,44 @@ public class QuotaSettingsFactory {
|
|||
return settings;
|
||||
}
|
||||
|
||||
protected static List<QuotaSettings> fromThrottle(final String userName, final TableName tableName,
|
||||
final String namespace, final QuotaProtos.Throttle throttle) {
|
||||
protected static List<QuotaSettings> fromThrottle(final String userName,
|
||||
final TableName tableName, final String namespace, final String regionServer,
|
||||
final QuotaProtos.Throttle throttle) {
|
||||
List<QuotaSettings> settings = new ArrayList<>();
|
||||
if (throttle.hasReqNum()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.REQUEST_NUMBER, throttle.getReqNum()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.REQUEST_NUMBER, throttle.getReqNum()));
|
||||
}
|
||||
if (throttle.hasReqSize()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.REQUEST_SIZE, throttle.getReqSize()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.REQUEST_SIZE, throttle.getReqSize()));
|
||||
}
|
||||
if (throttle.hasWriteNum()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.WRITE_NUMBER, throttle.getWriteNum()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.WRITE_NUMBER, throttle.getWriteNum()));
|
||||
}
|
||||
if (throttle.hasWriteSize()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.WRITE_SIZE, throttle.getWriteSize()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.WRITE_SIZE, throttle.getWriteSize()));
|
||||
}
|
||||
if (throttle.hasReadNum()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.READ_NUMBER, throttle.getReadNum()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.READ_NUMBER, throttle.getReadNum()));
|
||||
}
|
||||
if (throttle.hasReadSize()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
ThrottleType.READ_SIZE, throttle.getReadSize()));
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.READ_SIZE, throttle.getReadSize()));
|
||||
}
|
||||
if (throttle.hasReqCapacityUnit()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
|
||||
}
|
||||
if (throttle.hasReadCapacityUnit()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
|
||||
}
|
||||
if (throttle.hasWriteCapacityUnit()) {
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
|
||||
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer,
|
||||
ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
|
||||
}
|
||||
return settings;
|
||||
|
@ -195,7 +203,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, null, null, type, limit, timeUnit);
|
||||
return throttle(userName, null, null, null, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -210,7 +218,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final TableName tableName,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, tableName, null, type, limit, timeUnit);
|
||||
return throttle(userName, tableName, null, null, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -225,7 +233,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final String namespace,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, null, namespace, type, limit, timeUnit);
|
||||
return throttle(userName, null, namespace, null, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -235,7 +243,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName) {
|
||||
return throttle(userName, null, null, null, 0, null);
|
||||
return throttle(userName, null, null, null, null, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -246,7 +254,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName, final TableName tableName) {
|
||||
return throttle(userName, tableName, null, null, 0, null);
|
||||
return throttle(userName, tableName, null, null, null, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -257,7 +265,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName, final String namespace) {
|
||||
return throttle(userName, null, namespace, null, 0, null);
|
||||
return throttle(userName, null, namespace, null, null, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -271,7 +279,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleTable(final TableName tableName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, tableName, null, type, limit, timeUnit);
|
||||
return throttle(null, tableName, null, null, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -281,7 +289,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleTable(final TableName tableName) {
|
||||
return throttle(null, tableName, null, null, 0, null);
|
||||
return throttle(null, tableName, null, null, null, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -295,7 +303,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleNamespace(final String namespace, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, null, namespace, type, limit, timeUnit);
|
||||
return throttle(null, null, namespace, null, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -305,12 +313,36 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleNamespace(final String namespace) {
|
||||
return throttle(null, null, namespace, null, 0, null);
|
||||
return throttle(null, null, namespace, null, null, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified region server.
|
||||
*
|
||||
* @param regionServer the region server to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleRegionServer(final String regionServer,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, null, null, regionServer, type, limit, timeUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove the throttling for the specified region server.
|
||||
*
|
||||
* @param regionServer the region Server
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleRegionServer(final String regionServer) {
|
||||
return throttle(null, null, null, regionServer, null, 0, null);
|
||||
}
|
||||
|
||||
/* Throttle helper */
|
||||
private static QuotaSettings throttle(final String userName, final TableName tableName,
|
||||
final String namespace, final ThrottleType type, final long limit,
|
||||
final String namespace, final String regionServer, final ThrottleType type, final long limit,
|
||||
final TimeUnit timeUnit) {
|
||||
QuotaProtos.ThrottleRequest.Builder builder = QuotaProtos.ThrottleRequest.newBuilder();
|
||||
if (type != null) {
|
||||
|
@ -319,7 +351,7 @@ public class QuotaSettingsFactory {
|
|||
if (timeUnit != null) {
|
||||
builder.setTimedQuota(ProtobufUtil.toTimedQuota(limit, timeUnit, QuotaScope.MACHINE));
|
||||
}
|
||||
return new ThrottleSettings(userName, tableName, namespace, builder.build());
|
||||
return new ThrottleSettings(userName, tableName, namespace, regionServer, builder.build());
|
||||
}
|
||||
|
||||
/* ==========================================================================
|
||||
|
@ -334,7 +366,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings bypassGlobals(final String userName, final boolean bypassGlobals) {
|
||||
return new QuotaGlobalsSettingsBypass(userName, null, null, bypassGlobals);
|
||||
return new QuotaGlobalsSettingsBypass(userName, null, null, null, bypassGlobals);
|
||||
}
|
||||
|
||||
/* ==========================================================================
|
||||
|
|
|
@ -97,6 +97,13 @@ public class QuotaTableUtil {
|
|||
protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u.");
|
||||
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
|
||||
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
|
||||
protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
|
||||
|
||||
/*
|
||||
* TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
|
||||
* represents the throttle quota of all region servers
|
||||
*/
|
||||
public static final String QUOTA_REGION_SERVER_ROW_KEY = "all";
|
||||
|
||||
/* =========================================================================
|
||||
* Quota "settings" helpers
|
||||
|
@ -132,6 +139,11 @@ public class QuotaTableUtil {
|
|||
return getQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS);
|
||||
}
|
||||
|
||||
public static Quotas getRegionServerQuota(final Connection connection, final String regionServer)
|
||||
throws IOException {
|
||||
return getQuotas(connection, getRegionServerRowKey(regionServer));
|
||||
}
|
||||
|
||||
private static Quotas getQuotas(final Connection connection, final byte[] rowKey,
|
||||
final byte[] qualifier) throws IOException {
|
||||
Get get = new Get(rowKey);
|
||||
|
@ -155,6 +167,12 @@ public class QuotaTableUtil {
|
|||
return get;
|
||||
}
|
||||
|
||||
public static Get makeGetForRegionServerQuotas(final String regionServer) {
|
||||
Get get = new Get(getRegionServerRowKey(regionServer));
|
||||
get.addFamily(QUOTA_FAMILY_INFO);
|
||||
return get;
|
||||
}
|
||||
|
||||
public static Get makeGetForUserQuotas(final String user, final Iterable<TableName> tables,
|
||||
final Iterable<String> namespaces) {
|
||||
Get get = new Get(getUserRowKey(user));
|
||||
|
@ -218,6 +236,9 @@ public class QuotaTableUtil {
|
|||
} else if (StringUtils.isNotEmpty(filter.getNamespaceFilter())) {
|
||||
filterList.addFilter(new RowFilter(CompareOperator.EQUAL,
|
||||
new RegexStringComparator(getNamespaceRowKeyRegex(filter.getNamespaceFilter()), 0)));
|
||||
} else if (StringUtils.isNotEmpty(filter.getRegionServerFilter())) {
|
||||
filterList.addFilter(new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(
|
||||
getRegionServerRowKeyRegex(filter.getRegionServerFilter()), 0)));
|
||||
}
|
||||
return filterList;
|
||||
}
|
||||
|
@ -316,8 +337,13 @@ public class QuotaTableUtil {
|
|||
throws IOException;
|
||||
}
|
||||
|
||||
public static interface QuotasVisitor extends UserQuotasVisitor,
|
||||
TableQuotasVisitor, NamespaceQuotasVisitor {
|
||||
private static interface RegionServerQuotasVisitor {
|
||||
void visitRegionServerQuotas(final String regionServer, final Quotas quotas)
|
||||
throws IOException;
|
||||
}
|
||||
|
||||
public static interface QuotasVisitor extends UserQuotasVisitor, TableQuotasVisitor,
|
||||
NamespaceQuotasVisitor, RegionServerQuotasVisitor {
|
||||
}
|
||||
|
||||
public static void parseResult(final Result result, final QuotasVisitor visitor)
|
||||
|
@ -329,6 +355,8 @@ public class QuotaTableUtil {
|
|||
parseTableResult(result, visitor);
|
||||
} else if (isUserRowKey(row)) {
|
||||
parseUserResult(result, visitor);
|
||||
} else if (isRegionServerRowKey(row)) {
|
||||
parseRegionServerResult(result, visitor);
|
||||
} else {
|
||||
LOG.warn("unexpected row-key: " + Bytes.toString(row));
|
||||
}
|
||||
|
@ -362,6 +390,11 @@ public class QuotaTableUtil {
|
|||
public void visitNamespaceQuotas(String namespace, Quotas quotas) {
|
||||
quotaSettings.addAll(QuotaSettingsFactory.fromNamespaceQuotas(namespace, quotas));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visitRegionServerQuotas(String regionServer, Quotas quotas) {
|
||||
quotaSettings.addAll(QuotaSettingsFactory.fromRegionServerQuotas(regionServer, quotas));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -380,6 +413,21 @@ public class QuotaTableUtil {
|
|||
}
|
||||
}
|
||||
|
||||
private static void parseRegionServerResult(final Result result,
|
||||
final RegionServerQuotasVisitor visitor) throws IOException {
|
||||
String rs = getRegionServerFromRowKey(result.getRow());
|
||||
parseRegionServerResult(rs, result, visitor);
|
||||
}
|
||||
|
||||
private static void parseRegionServerResult(final String regionServer, final Result result,
|
||||
final RegionServerQuotasVisitor visitor) throws IOException {
|
||||
byte[] data = result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
|
||||
if (data != null) {
|
||||
Quotas quotas = quotasFromData(data);
|
||||
visitor.visitRegionServerQuotas(regionServer, quotas);
|
||||
}
|
||||
}
|
||||
|
||||
public static void parseTableResult(final Result result, final TableQuotasVisitor visitor)
|
||||
throws IOException {
|
||||
TableName table = getTableFromRowKey(result.getRow());
|
||||
|
@ -619,6 +667,10 @@ public class QuotaTableUtil {
|
|||
return Bytes.add(QUOTA_NAMESPACE_ROW_KEY_PREFIX, Bytes.toBytes(namespace));
|
||||
}
|
||||
|
||||
protected static byte[] getRegionServerRowKey(final String regionServer) {
|
||||
return Bytes.add(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, Bytes.toBytes(regionServer));
|
||||
}
|
||||
|
||||
protected static byte[] getSettingsQualifierForUserTable(final TableName tableName) {
|
||||
return Bytes.add(QUOTA_QUALIFIER_SETTINGS_PREFIX, tableName.getName());
|
||||
}
|
||||
|
@ -640,6 +692,10 @@ public class QuotaTableUtil {
|
|||
return getRowKeyRegEx(QUOTA_NAMESPACE_ROW_KEY_PREFIX, namespace);
|
||||
}
|
||||
|
||||
private static String getRegionServerRowKeyRegex(final String regionServer) {
|
||||
return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
|
||||
}
|
||||
|
||||
private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
|
||||
return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
|
||||
}
|
||||
|
@ -662,6 +718,14 @@ public class QuotaTableUtil {
|
|||
return Bytes.toString(key, QUOTA_NAMESPACE_ROW_KEY_PREFIX.length);
|
||||
}
|
||||
|
||||
protected static boolean isRegionServerRowKey(final byte[] key) {
|
||||
return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
|
||||
}
|
||||
|
||||
protected static String getRegionServerFromRowKey(final byte[] key) {
|
||||
return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
|
||||
}
|
||||
|
||||
protected static boolean isTableRowKey(final byte[] key) {
|
||||
return Bytes.startsWith(key, QUOTA_TABLE_ROW_KEY_PREFIX);
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ class SpaceLimitSettings extends QuotaSettings {
|
|||
private final SpaceLimitRequest proto;
|
||||
|
||||
SpaceLimitSettings(TableName tableName, long sizeLimit, SpaceViolationPolicy violationPolicy) {
|
||||
super(null, Objects.requireNonNull(tableName), null);
|
||||
super(null, Objects.requireNonNull(tableName), null, null);
|
||||
validateSizeLimit(sizeLimit);
|
||||
proto = buildProtoAddQuota(sizeLimit, Objects.requireNonNull(violationPolicy));
|
||||
}
|
||||
|
@ -46,12 +46,12 @@ class SpaceLimitSettings extends QuotaSettings {
|
|||
* Constructs a {@code SpaceLimitSettings} to remove a space quota on the given {@code tableName}.
|
||||
*/
|
||||
SpaceLimitSettings(TableName tableName) {
|
||||
super(null, Objects.requireNonNull(tableName), null);
|
||||
super(null, Objects.requireNonNull(tableName), null, null);
|
||||
proto = buildProtoRemoveQuota();
|
||||
}
|
||||
|
||||
SpaceLimitSettings(String namespace, long sizeLimit, SpaceViolationPolicy violationPolicy) {
|
||||
super(null, null, Objects.requireNonNull(namespace));
|
||||
super(null, null, Objects.requireNonNull(namespace), null);
|
||||
validateSizeLimit(sizeLimit);
|
||||
proto = buildProtoAddQuota(sizeLimit, Objects.requireNonNull(violationPolicy));
|
||||
}
|
||||
|
@ -60,12 +60,12 @@ class SpaceLimitSettings extends QuotaSettings {
|
|||
* Constructs a {@code SpaceLimitSettings} to remove a space quota on the given {@code namespace}.
|
||||
*/
|
||||
SpaceLimitSettings(String namespace) {
|
||||
super(null, null, Objects.requireNonNull(namespace));
|
||||
super(null, null, Objects.requireNonNull(namespace), null);
|
||||
proto = buildProtoRemoveQuota();
|
||||
}
|
||||
|
||||
SpaceLimitSettings(TableName tableName, String namespace, SpaceLimitRequest req) {
|
||||
super(null, tableName, namespace);
|
||||
super(null, tableName, namespace, null);
|
||||
proto = req;
|
||||
}
|
||||
|
||||
|
|
|
@ -25,19 +25,19 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
class ThrottleSettings extends QuotaSettings {
|
||||
final QuotaProtos.ThrottleRequest proto;
|
||||
|
||||
ThrottleSettings(final String userName, final TableName tableName,
|
||||
final String namespace, final QuotaProtos.ThrottleRequest proto) {
|
||||
super(userName, tableName, namespace);
|
||||
ThrottleSettings(final String userName, final TableName tableName, final String namespace,
|
||||
final String regionServer, final QuotaProtos.ThrottleRequest proto) {
|
||||
super(userName, tableName, namespace, regionServer);
|
||||
this.proto = proto;
|
||||
}
|
||||
|
||||
|
@ -146,7 +146,8 @@ class ThrottleSettings extends QuotaSettings {
|
|||
|
||||
QuotaProtos.ThrottleRequest mergedReq = builder.setTimedQuota(
|
||||
timedQuotaBuilder.build()).build();
|
||||
return new ThrottleSettings(getUserName(), getTableName(), getNamespace(), mergedReq);
|
||||
return new ThrottleSettings(getUserName(), getTableName(), getNamespace(),
|
||||
getRegionServer(), mergedReq);
|
||||
}
|
||||
}
|
||||
return this;
|
||||
|
@ -159,12 +160,12 @@ class ThrottleSettings extends QuotaSettings {
|
|||
}
|
||||
}
|
||||
|
||||
static ThrottleSettings fromTimedQuota(final String userName,
|
||||
final TableName tableName, final String namespace,
|
||||
ThrottleType type, QuotaProtos.TimedQuota timedQuota) {
|
||||
static ThrottleSettings fromTimedQuota(final String userName, final TableName tableName,
|
||||
final String namespace, final String regionServer, ThrottleType type,
|
||||
QuotaProtos.TimedQuota timedQuota) {
|
||||
QuotaProtos.ThrottleRequest.Builder builder = QuotaProtos.ThrottleRequest.newBuilder();
|
||||
builder.setType(ProtobufUtil.toProtoThrottleType(type));
|
||||
builder.setTimedQuota(timedQuota);
|
||||
return new ThrottleSettings(userName, tableName, namespace, builder.build());
|
||||
return new ThrottleSettings(userName, tableName, namespace, regionServer, builder.build());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,67 +39,103 @@ public class TestQuotaGlobalsSettingsBypass {
|
|||
|
||||
@Test
|
||||
public void testMerge() throws IOException {
|
||||
QuotaGlobalsSettingsBypass orig = new QuotaGlobalsSettingsBypass("joe", null, null, true);
|
||||
assertFalse(orig.merge(new QuotaGlobalsSettingsBypass(
|
||||
"joe", null, null, false)).getBypass());
|
||||
QuotaGlobalsSettingsBypass orig = new QuotaGlobalsSettingsBypass("joe", null, null, null, true);
|
||||
assertFalse(
|
||||
orig.merge(new QuotaGlobalsSettingsBypass("joe", null, null, null, false)).getBypass());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidMerges() throws IOException {
|
||||
QuotaGlobalsSettingsBypass userBypass = new QuotaGlobalsSettingsBypass(
|
||||
"joe", null, null, true);
|
||||
QuotaGlobalsSettingsBypass tableBypass = new QuotaGlobalsSettingsBypass(
|
||||
null, TableName.valueOf("table"), null, true);
|
||||
QuotaGlobalsSettingsBypass namespaceBypass = new QuotaGlobalsSettingsBypass(
|
||||
null, null, "ns", true);
|
||||
QuotaGlobalsSettingsBypass userOnTableBypass = new QuotaGlobalsSettingsBypass(
|
||||
"joe", TableName.valueOf("table"), null, true);
|
||||
QuotaGlobalsSettingsBypass userOnNamespaceBypass = new QuotaGlobalsSettingsBypass(
|
||||
"joe", null, "ns", true);
|
||||
QuotaGlobalsSettingsBypass userBypass =
|
||||
new QuotaGlobalsSettingsBypass("joe", null, null, null, true);
|
||||
QuotaGlobalsSettingsBypass tableBypass =
|
||||
new QuotaGlobalsSettingsBypass(null, TableName.valueOf("table"), null, null, true);
|
||||
QuotaGlobalsSettingsBypass namespaceBypass =
|
||||
new QuotaGlobalsSettingsBypass(null, null, "ns", null, true);
|
||||
QuotaGlobalsSettingsBypass regionServerBypass =
|
||||
new QuotaGlobalsSettingsBypass(null, null, null, "all", true);
|
||||
QuotaGlobalsSettingsBypass userOnTableBypass =
|
||||
new QuotaGlobalsSettingsBypass("joe", TableName.valueOf("table"), null, null, true);
|
||||
QuotaGlobalsSettingsBypass userOnNamespaceBypass =
|
||||
new QuotaGlobalsSettingsBypass("joe", null, "ns", null, true);
|
||||
QuotaGlobalsSettingsBypass userOnRegionServerBypass =
|
||||
new QuotaGlobalsSettingsBypass("joe", null, null, "all", true);
|
||||
|
||||
assertTrue(userBypass.merge(userBypass).getBypass());
|
||||
expectFailure(userBypass, new QuotaGlobalsSettingsBypass("frank", null, null, false));
|
||||
expectFailure(userBypass, new QuotaGlobalsSettingsBypass("frank", null, null, null, false));
|
||||
expectFailure(userBypass, tableBypass);
|
||||
expectFailure(userBypass, namespaceBypass);
|
||||
expectFailure(userBypass, regionServerBypass);
|
||||
expectFailure(userBypass, userOnTableBypass);
|
||||
expectFailure(userBypass, userOnNamespaceBypass);
|
||||
expectFailure(userBypass, userOnRegionServerBypass);
|
||||
|
||||
assertTrue(tableBypass.merge(tableBypass).getBypass());
|
||||
expectFailure(tableBypass, userBypass);
|
||||
expectFailure(tableBypass, new QuotaGlobalsSettingsBypass(
|
||||
null, TableName.valueOf("foo"), null, false));
|
||||
expectFailure(tableBypass,
|
||||
new QuotaGlobalsSettingsBypass(null, TableName.valueOf("foo"), null, null, false));
|
||||
expectFailure(tableBypass, namespaceBypass);
|
||||
expectFailure(tableBypass, regionServerBypass);
|
||||
expectFailure(tableBypass, userOnTableBypass);
|
||||
expectFailure(tableBypass, userOnNamespaceBypass);
|
||||
expectFailure(tableBypass, userOnRegionServerBypass);
|
||||
|
||||
assertTrue(namespaceBypass.merge(namespaceBypass).getBypass());
|
||||
expectFailure(namespaceBypass, userBypass);
|
||||
expectFailure(namespaceBypass, tableBypass);
|
||||
expectFailure(namespaceBypass, new QuotaGlobalsSettingsBypass(null, null, "sn", false));
|
||||
expectFailure(namespaceBypass, regionServerBypass);
|
||||
expectFailure(namespaceBypass, new QuotaGlobalsSettingsBypass(null, null, "sn", null, false));
|
||||
expectFailure(namespaceBypass, userOnTableBypass);
|
||||
expectFailure(namespaceBypass, userOnNamespaceBypass);
|
||||
expectFailure(namespaceBypass, userOnNamespaceBypass);
|
||||
|
||||
assertTrue(regionServerBypass.merge(regionServerBypass).getBypass());
|
||||
expectFailure(regionServerBypass, userBypass);
|
||||
expectFailure(regionServerBypass, tableBypass);
|
||||
expectFailure(regionServerBypass, namespaceBypass);
|
||||
expectFailure(regionServerBypass,
|
||||
new QuotaGlobalsSettingsBypass(null, null, null, "rs", false));
|
||||
expectFailure(regionServerBypass, userOnTableBypass);
|
||||
expectFailure(regionServerBypass, userOnNamespaceBypass);
|
||||
expectFailure(regionServerBypass, userOnRegionServerBypass);
|
||||
|
||||
assertTrue(userOnTableBypass.merge(userOnTableBypass).getBypass());
|
||||
expectFailure(userOnTableBypass, userBypass);
|
||||
expectFailure(userOnTableBypass, tableBypass);
|
||||
expectFailure(userOnTableBypass, namespaceBypass);
|
||||
expectFailure(userOnTableBypass, regionServerBypass);
|
||||
// Incorrect user
|
||||
expectFailure(userOnTableBypass, new QuotaGlobalsSettingsBypass(
|
||||
"frank", TableName.valueOf("foo"), null, false));
|
||||
expectFailure(userOnTableBypass,
|
||||
new QuotaGlobalsSettingsBypass("frank", TableName.valueOf("foo"), null, null, false));
|
||||
// Incorrect tablename
|
||||
expectFailure(userOnTableBypass, new QuotaGlobalsSettingsBypass(
|
||||
"joe", TableName.valueOf("bar"), null, false));
|
||||
expectFailure(userOnTableBypass,
|
||||
new QuotaGlobalsSettingsBypass("joe", TableName.valueOf("bar"), null, null, false));
|
||||
expectFailure(userOnTableBypass, userOnNamespaceBypass);
|
||||
expectFailure(userOnTableBypass, userOnRegionServerBypass);
|
||||
|
||||
assertTrue(userOnNamespaceBypass.merge(userOnNamespaceBypass).getBypass());
|
||||
expectFailure(userOnNamespaceBypass, userBypass);
|
||||
expectFailure(userOnNamespaceBypass, tableBypass);
|
||||
expectFailure(userOnNamespaceBypass, namespaceBypass);
|
||||
expectFailure(userOnNamespaceBypass, regionServerBypass);
|
||||
expectFailure(userOnNamespaceBypass, userOnTableBypass);
|
||||
expectFailure(userOnNamespaceBypass, new QuotaGlobalsSettingsBypass(
|
||||
"frank", null, "ns", false));
|
||||
expectFailure(userOnNamespaceBypass, new QuotaGlobalsSettingsBypass(
|
||||
"joe", null, "sn", false));
|
||||
expectFailure(userOnNamespaceBypass,
|
||||
new QuotaGlobalsSettingsBypass("frank", null, "ns", null, false));
|
||||
expectFailure(userOnNamespaceBypass,
|
||||
new QuotaGlobalsSettingsBypass("joe", null, "sn", null, false));
|
||||
expectFailure(userOnNamespaceBypass, userOnRegionServerBypass);
|
||||
|
||||
assertTrue(userOnRegionServerBypass.merge(userOnRegionServerBypass).getBypass());
|
||||
expectFailure(userOnRegionServerBypass, userBypass);
|
||||
expectFailure(userOnRegionServerBypass, tableBypass);
|
||||
expectFailure(userOnRegionServerBypass, namespaceBypass);
|
||||
expectFailure(userOnRegionServerBypass, regionServerBypass);
|
||||
expectFailure(userOnRegionServerBypass, userOnTableBypass);
|
||||
expectFailure(userOnRegionServerBypass, userOnNamespaceBypass);
|
||||
expectFailure(userOnRegionServerBypass,
|
||||
new QuotaGlobalsSettingsBypass("frank", null, null, "all", false));
|
||||
expectFailure(userOnRegionServerBypass,
|
||||
new QuotaGlobalsSettingsBypass("joe", null, null, "rs", false));
|
||||
}
|
||||
|
||||
void expectFailure(QuotaSettings one, QuotaSettings two) throws IOException {
|
||||
|
|
|
@ -87,6 +87,7 @@ public class TestQuotaSettingsFactory {
|
|||
assertEquals(tn, throttleSettings.getTableName());
|
||||
assertNull("Username should be null", throttleSettings.getUserName());
|
||||
assertNull("Namespace should be null", throttleSettings.getNamespace());
|
||||
assertNull("RegionServer should be null", throttleSettings.getRegionServer());
|
||||
seenRead = true;
|
||||
break;
|
||||
case WRITE_NUMBER:
|
||||
|
@ -96,6 +97,7 @@ public class TestQuotaSettingsFactory {
|
|||
assertEquals(tn, throttleSettings.getTableName());
|
||||
assertNull("Username should be null", throttleSettings.getUserName());
|
||||
assertNull("Namespace should be null", throttleSettings.getNamespace());
|
||||
assertNull("RegionServer should be null", throttleSettings.getRegionServer());
|
||||
seenWrite = true;
|
||||
break;
|
||||
default:
|
||||
|
@ -107,6 +109,7 @@ public class TestQuotaSettingsFactory {
|
|||
assertEquals(tn, spaceLimit.getTableName());
|
||||
assertNull("Username should be null", spaceLimit.getUserName());
|
||||
assertNull("Namespace should be null", spaceLimit.getNamespace());
|
||||
assertNull("RegionServer should be null", spaceLimit.getRegionServer());
|
||||
assertTrue("SpaceLimitSettings should have a SpaceQuota", spaceLimit.getProto().hasQuota());
|
||||
assertEquals(spaceQuota, spaceLimit.getProto().getQuota());
|
||||
seenSpace = true;
|
||||
|
|
|
@ -48,7 +48,7 @@ public class TestThrottleSettings {
|
|||
.setTimeUnit(HBaseProtos.TimeUnit.MINUTES).build();
|
||||
ThrottleRequest tr1 = ThrottleRequest.newBuilder().setTimedQuota(tq1)
|
||||
.setType(QuotaProtos.ThrottleType.REQUEST_NUMBER).build();
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, tr1);
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, null, tr1);
|
||||
|
||||
TimedQuota tq2 = TimedQuota.newBuilder().setSoftLimit(10)
|
||||
.setScope(QuotaProtos.QuotaScope.MACHINE)
|
||||
|
@ -56,7 +56,7 @@ public class TestThrottleSettings {
|
|||
ThrottleRequest tr2 = ThrottleRequest.newBuilder().setTimedQuota(tq2)
|
||||
.setType(QuotaProtos.ThrottleType.REQUEST_NUMBER).build();
|
||||
|
||||
ThrottleSettings merged = orig.merge(new ThrottleSettings("joe", null, null, tr2));
|
||||
ThrottleSettings merged = orig.merge(new ThrottleSettings("joe", null, null, null, tr2));
|
||||
|
||||
assertEquals(10, merged.getSoftLimit());
|
||||
assertEquals(ThrottleType.REQUEST_NUMBER, merged.getThrottleType());
|
||||
|
@ -70,7 +70,7 @@ public class TestThrottleSettings {
|
|||
.setTimeUnit(HBaseProtos.TimeUnit.MINUTES).build();
|
||||
ThrottleRequest requestsQuotaReq = ThrottleRequest.newBuilder().setTimedQuota(requestsQuota)
|
||||
.setType(QuotaProtos.ThrottleType.REQUEST_NUMBER).build();
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, requestsQuotaReq);
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, null, requestsQuotaReq);
|
||||
|
||||
TimedQuota readsQuota = TimedQuota.newBuilder().setSoftLimit(10)
|
||||
.setScope(QuotaProtos.QuotaScope.MACHINE)
|
||||
|
@ -79,7 +79,7 @@ public class TestThrottleSettings {
|
|||
.setType(QuotaProtos.ThrottleType.READ_NUMBER).build();
|
||||
|
||||
try {
|
||||
orig.merge(new ThrottleSettings("joe", null, null, readsQuotaReq));
|
||||
orig.merge(new ThrottleSettings("joe", null, null, null, readsQuotaReq));
|
||||
fail("A read throttle should not be capable of being merged with a request quota");
|
||||
} catch (IllegalArgumentException e) {
|
||||
// Pass
|
||||
|
@ -93,13 +93,13 @@ public class TestThrottleSettings {
|
|||
.setTimeUnit(HBaseProtos.TimeUnit.MINUTES).build();
|
||||
ThrottleRequest tr1 = ThrottleRequest.newBuilder().setTimedQuota(tq1)
|
||||
.setType(QuotaProtos.ThrottleType.REQUEST_NUMBER).build();
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, tr1);
|
||||
ThrottleSettings orig = new ThrottleSettings("joe", null, null, null, tr1);
|
||||
|
||||
ThrottleRequest tr2 = ThrottleRequest.newBuilder()
|
||||
.setType(QuotaProtos.ThrottleType.REQUEST_NUMBER).build();
|
||||
|
||||
assertTrue(
|
||||
"The same object should be returned by merge, but it wasn't",
|
||||
orig == orig.merge(new ThrottleSettings("joe", null, null, tr2)));
|
||||
orig == orig.merge(new ThrottleSettings("joe", null, null, null, tr2)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -572,6 +572,7 @@ message SetQuotaRequest {
|
|||
optional ThrottleRequest throttle = 7;
|
||||
|
||||
optional SpaceLimitRequest space_limit = 8;
|
||||
optional string region_server = 9;
|
||||
}
|
||||
|
||||
message SetQuotaResponse {
|
||||
|
|
|
@ -1090,6 +1090,24 @@ public interface MasterObserver {
|
|||
default void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final String namespace, final GlobalQuotaSettings quotas) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called before the quota for the region server is stored.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param regionServer the name of the region server
|
||||
* @param quotas the current quota for the region server
|
||||
*/
|
||||
default void preSetRegionServerQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final String regionServer, final GlobalQuotaSettings quotas) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called after the quota for the region server is stored.
|
||||
* @param ctx the environment to interact with the framework and master
|
||||
* @param regionServer the name of the region server
|
||||
* @param quotas the resulting quota for the region server
|
||||
*/
|
||||
default void postSetRegionServerQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final String regionServer, final GlobalQuotaSettings quotas) throws IOException {}
|
||||
|
||||
/**
|
||||
* Called before merge regions request.
|
||||
* @param ctx coprocessor environment
|
||||
|
|
|
@ -1283,6 +1283,26 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public void preSetRegionServerQuota(final String regionServer, final GlobalQuotaSettings quotas)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preSetRegionServerQuota(this, regionServer, quotas);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postSetRegionServerQuota(final String regionServer, final GlobalQuotaSettings quotas)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.postSetRegionServerQuota(this, regionServer, quotas);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preMoveServersAndTables(final Set<Address> servers, final Set<TableName> tables,
|
||||
final String targetGroup) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
|
|
|
@ -35,8 +35,9 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
|||
@InterfaceStability.Evolving
|
||||
public abstract class GlobalQuotaSettings extends QuotaSettings {
|
||||
|
||||
protected GlobalQuotaSettings(String userName, TableName tableName, String namespace) {
|
||||
super(userName, tableName, namespace);
|
||||
protected GlobalQuotaSettings(String userName, TableName tableName, String namespace,
|
||||
String regionServer) {
|
||||
super(userName, tableName, namespace, regionServer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -27,13 +27,14 @@ import java.util.Map.Entry;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory.QuotaGlobalsSettingsBypass;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.SpaceQuota;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Implementation of {@link GlobalQuotaSettings} to hide the Protobuf messages we use internally.
|
||||
|
@ -45,18 +46,18 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
|
|||
private final Boolean bypassGlobals;
|
||||
private final QuotaProtos.SpaceQuota spaceProto;
|
||||
|
||||
protected GlobalQuotaSettingsImpl(
|
||||
String username, TableName tableName, String namespace, QuotaProtos.Quotas quotas) {
|
||||
this(username, tableName, namespace,
|
||||
protected GlobalQuotaSettingsImpl(String username, TableName tableName, String namespace,
|
||||
String regionServer, QuotaProtos.Quotas quotas) {
|
||||
this(username, tableName, namespace, regionServer,
|
||||
(quotas != null && quotas.hasThrottle() ? quotas.getThrottle() : null),
|
||||
(quotas != null && quotas.hasBypassGlobals() ? quotas.getBypassGlobals() : null),
|
||||
(quotas != null && quotas.hasSpace() ? quotas.getSpace() : null));
|
||||
}
|
||||
|
||||
protected GlobalQuotaSettingsImpl(
|
||||
String userName, TableName tableName, String namespace, QuotaProtos.Throttle throttleProto,
|
||||
Boolean bypassGlobals, QuotaProtos.SpaceQuota spaceProto) {
|
||||
super(userName, tableName, namespace);
|
||||
protected GlobalQuotaSettingsImpl(String userName, TableName tableName, String namespace,
|
||||
String regionServer, QuotaProtos.Throttle throttleProto, Boolean bypassGlobals,
|
||||
QuotaProtos.SpaceQuota spaceProto) {
|
||||
super(userName, tableName, namespace, regionServer);
|
||||
this.throttleProto = throttleProto;
|
||||
this.bypassGlobals = bypassGlobals;
|
||||
this.spaceProto = spaceProto;
|
||||
|
@ -67,12 +68,12 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
|
|||
// Very similar to QuotaSettingsFactory
|
||||
List<QuotaSettings> settings = new ArrayList<>();
|
||||
if (throttleProto != null) {
|
||||
settings.addAll(QuotaSettingsFactory.fromThrottle(
|
||||
getUserName(), getTableName(), getNamespace(), throttleProto));
|
||||
settings.addAll(QuotaSettingsFactory.fromThrottle(getUserName(), getTableName(),
|
||||
getNamespace(), getRegionServer(), throttleProto));
|
||||
}
|
||||
if (bypassGlobals != null && bypassGlobals.booleanValue()) {
|
||||
settings.add(new QuotaGlobalsSettingsBypass(
|
||||
getUserName(), getTableName(), getNamespace(), true));
|
||||
settings.add(new QuotaGlobalsSettingsBypass(getUserName(), getTableName(), getNamespace(),
|
||||
getRegionServer(), true));
|
||||
}
|
||||
if (spaceProto != null) {
|
||||
settings.add(QuotaSettingsFactory.fromSpace(getTableName(), getNamespace(), spaceProto));
|
||||
|
@ -210,7 +211,7 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
|
|||
}
|
||||
|
||||
return new GlobalQuotaSettingsImpl(
|
||||
getUserName(), getTableName(), getNamespace(),
|
||||
getUserName(), getTableName(), getNamespace(), getRegionServer(),
|
||||
(throttleBuilder == null ? null : throttleBuilder.build()), bypassGlobals,
|
||||
(removeSpaceBuilder ? null : spaceBuilder.build()));
|
||||
}
|
||||
|
|
|
@ -72,6 +72,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
private NamedLock<String> namespaceLocks;
|
||||
private NamedLock<TableName> tableLocks;
|
||||
private NamedLock<String> userLocks;
|
||||
private NamedLock<String> regionServerLocks;
|
||||
private boolean initialized = false;
|
||||
private NamespaceAuditor namespaceQuotaManager;
|
||||
private ConcurrentHashMap<RegionInfo, SizeSnapshotWithTimestamp> regionSizes;
|
||||
|
@ -100,6 +101,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
namespaceLocks = new NamedLock<>();
|
||||
tableLocks = new NamedLock<>();
|
||||
userLocks = new NamedLock<>();
|
||||
regionServerLocks = new NamedLock<>();
|
||||
regionSizes = new ConcurrentHashMap<>();
|
||||
|
||||
namespaceQuotaManager = new NamespaceAuditor(masterServices);
|
||||
|
@ -152,9 +154,16 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
} finally {
|
||||
namespaceLocks.unlock(req.getNamespace());
|
||||
}
|
||||
} else if (req.hasRegionServer()) {
|
||||
regionServerLocks.lock(req.getRegionServer());
|
||||
try {
|
||||
setRegionServerQuota(req.getRegionServer(), req);
|
||||
} finally {
|
||||
regionServerLocks.unlock(req.getRegionServer());
|
||||
}
|
||||
} else {
|
||||
throw new DoNotRetryIOException(
|
||||
new UnsupportedOperationException("a user, a table or a namespace must be specified"));
|
||||
throw new DoNotRetryIOException(new UnsupportedOperationException(
|
||||
"a user, a table, a namespace or region server must be specified"));
|
||||
}
|
||||
return SetQuotaResponse.newBuilder().build();
|
||||
}
|
||||
|
@ -164,8 +173,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, QuotaUtil.getUserQuota(
|
||||
masterServices.getConnection(), userName));
|
||||
return new GlobalQuotaSettingsImpl(req.getUserName(), null, null, null,
|
||||
QuotaUtil.getUserQuota(masterServices.getConnection(), userName));
|
||||
}
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
|
@ -191,8 +200,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(userName, table, null, QuotaUtil.getUserQuota(
|
||||
masterServices.getConnection(), userName, table));
|
||||
return new GlobalQuotaSettingsImpl(userName, table, null, null,
|
||||
QuotaUtil.getUserQuota(masterServices.getConnection(), userName, table));
|
||||
}
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
|
@ -219,8 +228,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(userName, null, namespace, QuotaUtil.getUserQuota(
|
||||
masterServices.getConnection(), userName, namespace));
|
||||
return new GlobalQuotaSettingsImpl(userName, null, namespace, null,
|
||||
QuotaUtil.getUserQuota(masterServices.getConnection(), userName, namespace));
|
||||
}
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
|
@ -249,8 +258,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(null, table, null, QuotaUtil.getTableQuota(
|
||||
masterServices.getConnection(), table));
|
||||
return new GlobalQuotaSettingsImpl(null, table, null, null,
|
||||
QuotaUtil.getTableQuota(masterServices.getConnection(), table));
|
||||
}
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
|
@ -276,8 +285,8 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(null, null, namespace, QuotaUtil.getNamespaceQuota(
|
||||
masterServices.getConnection(), namespace));
|
||||
return new GlobalQuotaSettingsImpl(null, null, namespace, null,
|
||||
QuotaUtil.getNamespaceQuota(masterServices.getConnection(), namespace));
|
||||
}
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
|
@ -299,6 +308,38 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
});
|
||||
}
|
||||
|
||||
public void setRegionServerQuota(final String regionServer, final SetQuotaRequest req)
|
||||
throws IOException, InterruptedException {
|
||||
setQuota(req, new SetQuotaOperations() {
|
||||
@Override
|
||||
public GlobalQuotaSettingsImpl fetch() throws IOException {
|
||||
return new GlobalQuotaSettingsImpl(null, null, null, regionServer,
|
||||
QuotaUtil.getRegionServerQuota(masterServices.getConnection(), regionServer));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
|
||||
((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete() throws IOException {
|
||||
QuotaUtil.deleteRegionServerQuota(masterServices.getConnection(), regionServer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
masterServices.getMasterCoprocessorHost().preSetRegionServerQuota(regionServer, quotaPojo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postApply(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
masterServices.getMasterCoprocessorHost().postSetRegionServerQuota(regionServer, quotaPojo);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void setNamespaceQuota(NamespaceDescriptor desc) throws IOException {
|
||||
if (initialized) {
|
||||
this.namespaceQuotaManager.addNamespace(desc);
|
||||
|
|
|
@ -34,14 +34,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Cache that keeps track of the quota settings for the users and tables that
|
||||
|
@ -69,6 +69,8 @@ public class QuotaCache implements Stoppable {
|
|||
private final ConcurrentHashMap<String, QuotaState> namespaceQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<TableName, QuotaState> tableQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private final RegionServerServices rsServices;
|
||||
|
||||
private QuotaRefresherChore refreshChore;
|
||||
|
@ -146,6 +148,16 @@ public class QuotaCache implements Stoppable {
|
|||
return getQuotaState(this.namespaceQuotaCache, namespace).getGlobalLimiter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the limiter associated to the specified region server.
|
||||
*
|
||||
* @param regionServer the region server to limit
|
||||
* @return the limiter associated to the specified region server
|
||||
*/
|
||||
public QuotaLimiter getRegionServerQuotaLimiter(final String regionServer) {
|
||||
return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
|
||||
* returned and the quota request will be enqueued for the next cache refresh.
|
||||
|
@ -170,6 +182,11 @@ public class QuotaCache implements Stoppable {
|
|||
return namespaceQuotaCache;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<String, QuotaState> getRegionServerQuotaCache() {
|
||||
return regionServerQuotaCache;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<TableName, QuotaState> getTableQuotaCache() {
|
||||
return tableQuotaCache;
|
||||
|
@ -203,10 +220,13 @@ public class QuotaCache implements Stoppable {
|
|||
QuotaCache.this.namespaceQuotaCache.putIfAbsent(ns, new QuotaState());
|
||||
}
|
||||
}
|
||||
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
|
||||
new QuotaState());
|
||||
|
||||
fetchNamespaceQuotaState();
|
||||
fetchTableQuotaState();
|
||||
fetchUserQuotaState();
|
||||
fetchRegionServerQuotaState();
|
||||
lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
|
||||
|
@ -257,6 +277,21 @@ public class QuotaCache implements Stoppable {
|
|||
});
|
||||
}
|
||||
|
||||
private void fetchRegionServerQuotaState() {
|
||||
fetch("regionServer", QuotaCache.this.regionServerQuotaCache,
|
||||
new Fetcher<String, QuotaState>() {
|
||||
@Override
|
||||
public Get makeGet(final Map.Entry<String, QuotaState> entry) {
|
||||
return QuotaUtil.makeGetForRegionServerQuotas(entry.getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, QuotaState> fetchEntries(final List<Get> gets) throws IOException {
|
||||
return QuotaUtil.fetchRegionServerQuotas(rsServices.getConnection(), gets);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private <K, V extends QuotaState> void fetch(final String type,
|
||||
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
|
|
|
@ -29,10 +29,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
|
@ -40,10 +36,15 @@ import org.apache.hadoop.hbase.client.Mutation;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
|
||||
/**
|
||||
* Helper class to interact with the quota table
|
||||
|
@ -142,6 +143,16 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
getSettingsQualifierForUserNamespace(namespace));
|
||||
}
|
||||
|
||||
public static void addRegionServerQuota(final Connection connection, final String regionServer,
|
||||
final Quotas data) throws IOException {
|
||||
addQuotas(connection, getRegionServerRowKey(regionServer), data);
|
||||
}
|
||||
|
||||
public static void deleteRegionServerQuota(final Connection connection, final String regionServer)
|
||||
throws IOException {
|
||||
deleteQuotas(connection, getRegionServerRowKey(regionServer));
|
||||
}
|
||||
|
||||
private static void addQuotas(final Connection connection, final byte[] rowKey,
|
||||
final Quotas data) throws IOException {
|
||||
addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
|
||||
|
@ -232,6 +243,17 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
});
|
||||
}
|
||||
|
||||
public static Map<String, QuotaState> fetchRegionServerQuotas(final Connection connection,
|
||||
final List<Get> gets) throws IOException {
|
||||
return fetchGlobalQuotas("regionServer", connection, gets, new KeyFromRow<String>() {
|
||||
@Override
|
||||
public String getKeyFromRow(final byte[] row) {
|
||||
assert isRegionServerRowKey(row);
|
||||
return getRegionServerFromRowKey(row);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
|
||||
final Connection connection, final List<Get> gets, final KeyFromRow<K> kfr)
|
||||
throws IOException {
|
||||
|
|
|
@ -22,19 +22,19 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.ipc.RpcScheduler;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.Region;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
|
||||
|
||||
/**
|
||||
* Region Server Quota Manager.
|
||||
|
@ -135,14 +135,17 @@ public class RegionServerRpcQuotaManager {
|
|||
} else {
|
||||
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
|
||||
QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
|
||||
useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
|
||||
QuotaLimiter rsLimiter = quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
|
||||
useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
|
||||
userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
|
||||
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
|
||||
+ " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
|
||||
+ rsLimiter);
|
||||
}
|
||||
if (!useNoop) {
|
||||
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
|
||||
tableLimiter, nsLimiter);
|
||||
tableLimiter, nsLimiter, rsLimiter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2514,6 +2514,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
requirePermission(ctx, "setNamespaceQuota", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSetRegionServerQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final String regionServer, GlobalQuotaSettings quotas) throws IOException {
|
||||
requirePermission(ctx, "setRegionServerQuota", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicationEndpoint postCreateReplicationEndPoint(
|
||||
ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
|
||||
|
|
|
@ -58,9 +58,9 @@ public class TestGlobalQuotaSettingsImpl {
|
|||
QuotaProtos.ThrottleRequest writeThrottle = QuotaProtos.ThrottleRequest.newBuilder()
|
||||
.setTimedQuota(writeQuota).setType(QuotaProtos.ThrottleType.WRITE_NUMBER).build();
|
||||
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl("joe", null, null, quota);
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl("joe", null, null, null, quota);
|
||||
GlobalQuotaSettingsImpl merged = settings.merge(
|
||||
new ThrottleSettings("joe", null, null, writeThrottle));
|
||||
new ThrottleSettings("joe", null, null, null, writeThrottle));
|
||||
|
||||
QuotaProtos.Throttle mergedThrottle = merged.getThrottleProto();
|
||||
// Verify the request throttle is in place
|
||||
|
@ -80,7 +80,7 @@ public class TestGlobalQuotaSettingsImpl {
|
|||
QuotaProtos.Quotas quota = QuotaProtos.Quotas.newBuilder()
|
||||
.setSpace(SPACE_QUOTA).build();
|
||||
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl(null, tn, null, quota);
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl(null, tn, null, null, quota);
|
||||
// Switch the violation policy to DISABLE
|
||||
GlobalQuotaSettingsImpl merged = settings.merge(
|
||||
new SpaceLimitSettings(tn, SPACE_QUOTA.getSoftLimit(), SpaceViolationPolicy.DISABLE));
|
||||
|
@ -96,7 +96,7 @@ public class TestGlobalQuotaSettingsImpl {
|
|||
final String ns = "org1";
|
||||
QuotaProtos.Quotas quota = QuotaProtos.Quotas.newBuilder()
|
||||
.setThrottle(THROTTLE).setSpace(SPACE_QUOTA).build();
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl(null, null, ns, quota);
|
||||
GlobalQuotaSettingsImpl settings = new GlobalQuotaSettingsImpl(null, null, ns, null, quota);
|
||||
|
||||
QuotaProtos.TimedQuota writeQuota = REQUEST_THROTTLE.toBuilder()
|
||||
.setSoftLimit(500).build();
|
||||
|
@ -105,7 +105,7 @@ public class TestGlobalQuotaSettingsImpl {
|
|||
.setTimedQuota(writeQuota).setType(QuotaProtos.ThrottleType.WRITE_NUMBER).build();
|
||||
|
||||
GlobalQuotaSettingsImpl merged = settings.merge(
|
||||
new ThrottleSettings(null, null, ns, writeThrottle));
|
||||
new ThrottleSettings(null, null, ns, null, writeThrottle));
|
||||
GlobalQuotaSettingsImpl finalQuota = merged.merge(new SpaceLimitSettings(
|
||||
ns, SPACE_QUOTA.getSoftLimit(), SpaceViolationPolicy.NO_WRITES_COMPACTIONS));
|
||||
|
||||
|
|
|
@ -174,6 +174,7 @@ public class TestQuotaAdmin {
|
|||
assertEquals(userName, throttle.getUserName());
|
||||
assertEquals(null, throttle.getTableName());
|
||||
assertEquals(null, throttle.getNamespace());
|
||||
assertEquals(null, throttle.getRegionServer());
|
||||
assertEquals(6, throttle.getSoftLimit());
|
||||
assertEquals(TimeUnit.MINUTES, throttle.getTimeUnit());
|
||||
countThrottle++;
|
||||
|
@ -523,6 +524,51 @@ public class TestQuotaAdmin {
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetAndRemoveRegionServerQuota() throws Exception {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
String regionServer = QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY;
|
||||
QuotaFilter rsFilter = new QuotaFilter().setRegionServerFilter(regionServer);
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer,
|
||||
ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES));
|
||||
assertNumResults(1, rsFilter);
|
||||
// Verify the Quota in the table
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES);
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer,
|
||||
ThrottleType.REQUEST_NUMBER, 20, TimeUnit.MINUTES));
|
||||
assertNumResults(1, rsFilter);
|
||||
// Verify the Quota in the table
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 20, TimeUnit.MINUTES);
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer, ThrottleType.READ_NUMBER,
|
||||
30, TimeUnit.SECONDS));
|
||||
int count = 0;
|
||||
QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), rsFilter);
|
||||
try {
|
||||
for (QuotaSettings settings : scanner) {
|
||||
assertTrue(settings.getQuotaType() == QuotaType.THROTTLE);
|
||||
ThrottleSettings throttleSettings = (ThrottleSettings) settings;
|
||||
assertEquals(regionServer, throttleSettings.getRegionServer());
|
||||
count++;
|
||||
if (throttleSettings.getThrottleType() == ThrottleType.REQUEST_NUMBER) {
|
||||
assertEquals(20, throttleSettings.getSoftLimit());
|
||||
assertEquals(TimeUnit.MINUTES, throttleSettings.getTimeUnit());
|
||||
} else if (throttleSettings.getThrottleType() == ThrottleType.READ_NUMBER) {
|
||||
assertEquals(30, throttleSettings.getSoftLimit());
|
||||
assertEquals(TimeUnit.SECONDS, throttleSettings.getTimeUnit());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
scanner.close();
|
||||
}
|
||||
assertEquals(2, count);
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleRegionServer(regionServer));
|
||||
assertNumResults(0, new QuotaFilter().setRegionServerFilter(regionServer));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcThrottleWhenStartup() throws IOException, InterruptedException {
|
||||
TEST_UTIL.getAdmin().switchRpcThrottle(false);
|
||||
|
|
|
@ -570,6 +570,31 @@ public class TestQuotaThrottle {
|
|||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegionServerThrottle() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 5,
|
||||
TimeUnit.MINUTES));
|
||||
|
||||
// requests are throttled by table quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 7, TimeUnit.MINUTES));
|
||||
triggerCacheRefresh(false, false, true, false, true, TABLE_NAMES[0]);
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
|
||||
// requests are throttled by region server quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 4, TimeUnit.MINUTES));
|
||||
triggerCacheRefresh(false, false, false, false, true, TABLE_NAMES[0]);
|
||||
assertEquals(4, doPuts(10, tables[0]));
|
||||
|
||||
// unthrottle
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.unthrottleRegionServer(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerCacheRefresh(true, false, true, false, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
private int doPuts(int maxOps, final Table... tables) throws Exception {
|
||||
return doPuts(maxOps, -1, tables);
|
||||
}
|
||||
|
@ -622,19 +647,19 @@ public class TestQuotaThrottle {
|
|||
}
|
||||
|
||||
private void triggerUserCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, true, false, false, tables);
|
||||
triggerCacheRefresh(bypass, true, false, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerTableCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, true, false, tables);
|
||||
triggerCacheRefresh(bypass, false, true, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerNamespaceCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, false, true, tables);
|
||||
triggerCacheRefresh(bypass, false, false, true, false, tables);
|
||||
}
|
||||
|
||||
private void triggerCacheRefresh(boolean bypass, boolean userLimiter, boolean tableLimiter,
|
||||
boolean nsLimiter, final TableName... tables) throws Exception {
|
||||
boolean nsLimiter, boolean rsLimiter, final TableName... tables) throws Exception {
|
||||
envEdge.incValue(2 * REFRESH_TIME);
|
||||
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
|
@ -663,6 +688,11 @@ public class TestQuotaThrottle {
|
|||
if (nsLimiter) {
|
||||
isBypass &= quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
|
||||
}
|
||||
if (rsLimiter) {
|
||||
isBypass &= quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)
|
||||
.isBypass();
|
||||
}
|
||||
if (isBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
|
@ -675,6 +705,7 @@ public class TestQuotaThrottle {
|
|||
LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getTableQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getUserQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2697,6 +2697,15 @@ public class TestAccessController extends SecureTestUtil {
|
|||
}
|
||||
};
|
||||
|
||||
AccessTestAction setRegionServerQuotaAction = new AccessTestAction() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preSetRegionServerQuota(ObserverContextImpl.createAndPrepare(CP_ENV),
|
||||
null, null);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
verifyAllowed(setUserQuotaAction, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
|
||||
verifyDenied(setUserQuotaAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER,
|
||||
USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
|
||||
|
@ -2715,6 +2724,10 @@ public class TestAccessController extends SecureTestUtil {
|
|||
verifyAllowed(setNamespaceQuotaAction, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
|
||||
verifyDenied(setNamespaceQuotaAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER,
|
||||
USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
|
||||
|
||||
verifyAllowed(setRegionServerQuotaAction, SUPERUSER, USER_ADMIN, USER_GROUP_ADMIN);
|
||||
verifyDenied(setRegionServerQuotaAction, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER,
|
||||
USER_GROUP_READ, USER_GROUP_WRITE, USER_GROUP_CREATE);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -82,8 +82,11 @@ module Hbase
|
|||
namespace = args.delete(NAMESPACE)
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleNamespace(namespace, type, limit, time_unit)
|
||||
elsif args.key?(REGIONSERVER)
|
||||
# TODO: Setting specified region server quota isn't supported currently and using 'all' for all RS
|
||||
settings = QuotaSettingsFactory.throttleRegionServer('all', type, limit, time_unit)
|
||||
else
|
||||
raise 'One of USER, TABLE or NAMESPACE must be specified'
|
||||
raise 'One of USER, TABLE, NAMESPACE or REGIONSERVER must be specified'
|
||||
end
|
||||
@admin.setQuota(settings)
|
||||
end
|
||||
|
@ -112,8 +115,13 @@ module Hbase
|
|||
namespace = args.delete(NAMESPACE)
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.unthrottleNamespace(namespace)
|
||||
elsif args.key?(REGIONSERVER)
|
||||
regionServer = args.delete(REGIONSERVER)
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
# TODO: Setting specified region server quota isn't supported currently and using 'all' for all RS
|
||||
settings = QuotaSettingsFactory.unthrottleRegionServer('all')
|
||||
else
|
||||
raise 'One of USER, TABLE or NAMESPACE must be specified'
|
||||
raise 'One of USER, TABLE, NAMESPACE or REGIONSERVER must be specified'
|
||||
end
|
||||
@admin.setQuota(settings)
|
||||
end
|
||||
|
@ -233,7 +241,8 @@ module Hbase
|
|||
owner = {
|
||||
USER => settings.getUserName,
|
||||
TABLE => settings.getTableName,
|
||||
NAMESPACE => settings.getNamespace
|
||||
NAMESPACE => settings.getNamespace,
|
||||
REGIONSERVER => settings.getRegionServer
|
||||
}.delete_if { |_k, v| v.nil? }.map { |k, v| k.to_s + ' => ' + v.to_s } * ', '
|
||||
|
||||
yield owner, settings.to_s
|
||||
|
|
|
@ -22,7 +22,7 @@ module Shell
|
|||
class SetQuota < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Set a quota for a user, table, or namespace.
|
||||
Set a quota for a user, table, namespace or region server.
|
||||
Syntax : set_quota TYPE => <type>, <args>
|
||||
|
||||
TYPE => THROTTLE
|
||||
|
@ -55,6 +55,10 @@ For example:
|
|||
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => NONE
|
||||
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => NONE
|
||||
|
||||
hbase> set_quota TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => '30000req/sec'
|
||||
hbase> set_quota TYPE => THROTTLE, REGIONSERVER => 'all', THROTTLE_TYPE => WRITE, LIMIT => '20000req/sec'
|
||||
hbase> set_quota TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => NONE
|
||||
|
||||
hbase> set_quota USER => 'u1', GLOBAL_BYPASS => true
|
||||
|
||||
TYPE => SPACE
|
||||
|
|
|
@ -251,6 +251,27 @@ module Hbase
|
|||
output = capture_stdout { command(:enable_rpc_throttle) }
|
||||
assert(output.include?('Previous rpc throttle state : false'))
|
||||
end
|
||||
|
||||
define_test 'can set and remove region server quota' do
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => '1CU/sec')
|
||||
output = capture_stdout{ command(:list_quotas) }
|
||||
assert(output.include?('REGIONSERVER => all'))
|
||||
assert(output.include?('TYPE => THROTTLE'))
|
||||
assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
|
||||
assert(output.include?('LIMIT => 1CU/sec'))
|
||||
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', THROTTLE_TYPE => WRITE, LIMIT => '2req/sec')
|
||||
output = capture_stdout{ command(:list_quotas) }
|
||||
assert(output.include?('REGIONSERVER => all'))
|
||||
assert(output.include?('TYPE => THROTTLE'))
|
||||
assert(output.include?('THROTTLE_TYPE => WRITE'))
|
||||
assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
|
||||
assert(output.include?('LIMIT => 2req/sec'))
|
||||
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => NONE)
|
||||
output = capture_stdout{ command(:list_quotas) }
|
||||
assert(output.include?('0 row(s)'))
|
||||
end
|
||||
end
|
||||
# rubocop:enable Metrics/ClassLength
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue