HBASE-21820 Implement CLUSTER quota scope
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
b1c42f1009
commit
9370347efe
|
@ -203,7 +203,21 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, null, null, null, type, limit, timeUnit);
|
||||
return throttleUser(userName, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified user.
|
||||
* @param userName the user to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @param scope the scope of throttling
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit, QuotaScope scope) {
|
||||
return throttle(userName, null, null, null, type, limit, timeUnit, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -218,7 +232,22 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final TableName tableName,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, tableName, null, null, type, limit, timeUnit);
|
||||
return throttleUser(userName, tableName, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified user on the specified table.
|
||||
* @param userName the user to throttle
|
||||
* @param tableName the table to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @param scope the scope of throttling
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final TableName tableName,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit, QuotaScope scope) {
|
||||
return throttle(userName, tableName, null, null, type, limit, timeUnit, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -233,7 +262,22 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final String namespace,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(userName, null, namespace, null, type, limit, timeUnit);
|
||||
return throttleUser(userName, namespace, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified user on the specified namespace.
|
||||
* @param userName the user to throttle
|
||||
* @param namespace the namespace to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @param scope the scope of throttling
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleUser(final String userName, final String namespace,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit, QuotaScope scope) {
|
||||
return throttle(userName, null, namespace, null, type, limit, timeUnit, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -243,7 +287,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName) {
|
||||
return throttle(userName, null, null, null, null, 0, null);
|
||||
return throttle(userName, null, null, null, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -254,7 +298,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName, final TableName tableName) {
|
||||
return throttle(userName, tableName, null, null, null, 0, null);
|
||||
return throttle(userName, tableName, null, null, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -265,7 +309,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleUser(final String userName, final String namespace) {
|
||||
return throttle(userName, null, namespace, null, null, 0, null);
|
||||
return throttle(userName, null, namespace, null, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -279,7 +323,21 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleTable(final TableName tableName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, tableName, null, null, type, limit, timeUnit);
|
||||
return throttleTable(tableName, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified table.
|
||||
* @param tableName the table to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @param scope the scope of throttling
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleTable(final TableName tableName, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit, QuotaScope scope) {
|
||||
return throttle(null, tableName, null, null, type, limit, timeUnit, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -289,7 +347,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleTable(final TableName tableName) {
|
||||
return throttle(null, tableName, null, null, null, 0, null);
|
||||
return throttle(null, tableName, null, null, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -303,7 +361,21 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleNamespace(final String namespace, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, null, namespace, null, type, limit, timeUnit);
|
||||
return throttleNamespace(namespace, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle the specified namespace.
|
||||
* @param namespace the namespace to throttle
|
||||
* @param type the type of throttling
|
||||
* @param limit the allowed number of request/data per timeUnit
|
||||
* @param timeUnit the limit time unit
|
||||
* @param scope the scope of throttling
|
||||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings throttleNamespace(final String namespace, final ThrottleType type,
|
||||
final long limit, final TimeUnit timeUnit, QuotaScope scope) {
|
||||
return throttle(null, null, namespace, null, type, limit, timeUnit, scope);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -313,7 +385,7 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleNamespace(final String namespace) {
|
||||
return throttle(null, null, namespace, null, null, 0, null);
|
||||
return throttle(null, null, namespace, null, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -327,7 +399,7 @@ public class QuotaSettingsFactory {
|
|||
*/
|
||||
public static QuotaSettings throttleRegionServer(final String regionServer,
|
||||
final ThrottleType type, final long limit, final TimeUnit timeUnit) {
|
||||
return throttle(null, null, null, regionServer, type, limit, timeUnit);
|
||||
return throttle(null, null, null, regionServer, type, limit, timeUnit, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -337,19 +409,19 @@ public class QuotaSettingsFactory {
|
|||
* @return the quota settings
|
||||
*/
|
||||
public static QuotaSettings unthrottleRegionServer(final String regionServer) {
|
||||
return throttle(null, null, null, regionServer, null, 0, null);
|
||||
return throttle(null, null, null, regionServer, null, 0, null, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
/* Throttle helper */
|
||||
private static QuotaSettings throttle(final String userName, final TableName tableName,
|
||||
final String namespace, final String regionServer, final ThrottleType type, final long limit,
|
||||
final TimeUnit timeUnit) {
|
||||
final TimeUnit timeUnit, QuotaScope scope) {
|
||||
QuotaProtos.ThrottleRequest.Builder builder = QuotaProtos.ThrottleRequest.newBuilder();
|
||||
if (type != null) {
|
||||
builder.setType(ProtobufUtil.toProtoThrottleType(type));
|
||||
}
|
||||
if (timeUnit != null) {
|
||||
builder.setTimedQuota(ProtobufUtil.toTimedQuota(limit, timeUnit, QuotaScope.MACHINE));
|
||||
builder.setTimedQuota(ProtobufUtil.toTimedQuota(limit, timeUnit, scope));
|
||||
}
|
||||
return new ThrottleSettings(userName, tableName, namespace, regionServer, builder.build());
|
||||
}
|
||||
|
|
|
@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.quotas;
|
|||
|
||||
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
|
||||
|
||||
import org.apache.hadoop.hbase.ClusterMetrics.Option;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -72,6 +75,10 @@ public class QuotaCache implements Stoppable {
|
|||
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private volatile boolean exceedThrottleQuotaEnabled = false;
|
||||
// factors used to divide cluster scope quota into machine scope quota
|
||||
private volatile double machineQuotaFactor = 1;
|
||||
private final ConcurrentHashMap<TableName, Double> tableMachineQuotaFactors =
|
||||
new ConcurrentHashMap<>();
|
||||
private final RegionServerServices rsServices;
|
||||
|
||||
private QuotaRefresherChore refreshChore;
|
||||
|
@ -228,6 +235,7 @@ public class QuotaCache implements Stoppable {
|
|||
QuotaCache.this.regionServerQuotaCache.putIfAbsent(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY,
|
||||
new QuotaState());
|
||||
|
||||
updateQuotaFactors();
|
||||
fetchNamespaceQuotaState();
|
||||
fetchTableQuotaState();
|
||||
fetchUserQuotaState();
|
||||
|
@ -246,7 +254,8 @@ public class QuotaCache implements Stoppable {
|
|||
@Override
|
||||
public Map<String, QuotaState> fetchEntries(final List<Get> gets)
|
||||
throws IOException {
|
||||
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets);
|
||||
return QuotaUtil.fetchNamespaceQuotas(rsServices.getConnection(), gets,
|
||||
machineQuotaFactor);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -261,7 +270,8 @@ public class QuotaCache implements Stoppable {
|
|||
@Override
|
||||
public Map<TableName, QuotaState> fetchEntries(final List<Get> gets)
|
||||
throws IOException {
|
||||
return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets);
|
||||
return QuotaUtil.fetchTableQuotas(rsServices.getConnection(), gets,
|
||||
tableMachineQuotaFactors);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -278,7 +288,8 @@ public class QuotaCache implements Stoppable {
|
|||
@Override
|
||||
public Map<String, UserQuotaState> fetchEntries(final List<Get> gets)
|
||||
throws IOException {
|
||||
return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets);
|
||||
return QuotaUtil.fetchUserQuotas(rsServices.getConnection(), gets,
|
||||
tableMachineQuotaFactors, machineQuotaFactor);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -351,6 +362,46 @@ public class QuotaCache implements Stoppable {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update quota factors which is used to divide cluster scope quota into machine scope quota
|
||||
*
|
||||
* For user/namespace/user over namespace quota, use [1 / RSNum] as machine factor.
|
||||
* For table/user over table quota, use [1 / TotalTableRegionNum * MachineTableRegionNum]
|
||||
* as machine factor.
|
||||
*/
|
||||
private void updateQuotaFactors() {
|
||||
// Update machine quota factor
|
||||
try {
|
||||
int rsSize = rsServices.getConnection().getAdmin()
|
||||
.getClusterMetrics(EnumSet.of(Option.SERVERS_NAME)).getServersName().size();
|
||||
if (rsSize != 0) {
|
||||
// TODO if use rs group, the cluster limit should be shared by the rs group
|
||||
machineQuotaFactor = 1.0 / rsSize;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Get live region servers failed", e);
|
||||
}
|
||||
|
||||
// Update table machine quota factors
|
||||
for (TableName tableName : tableQuotaCache.keySet()) {
|
||||
double factor = 1;
|
||||
try {
|
||||
long regionSize =
|
||||
MetaTableAccessor.getTableRegions(rsServices.getConnection(), tableName, true)
|
||||
.stream().filter(regionInfo -> !regionInfo.isOffline()).count();
|
||||
if (regionSize == 0) {
|
||||
factor = 0;
|
||||
} else {
|
||||
int localRegionSize = rsServices.getRegions(tableName).size();
|
||||
factor = 1.0 * localRegionSize / regionSize;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Get table regions failed: {}", tableName, e);
|
||||
}
|
||||
tableMachineQuotaFactors.put(tableName, factor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static interface Fetcher<Key, Value> {
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||
|
@ -266,7 +267,8 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
}
|
||||
|
||||
public static Map<String, UserQuotaState> fetchUserQuotas(final Connection connection,
|
||||
final List<Get> gets) throws IOException {
|
||||
final List<Get> gets, Map<TableName, Double> tableMachineQuotaFactors, double factor)
|
||||
throws IOException {
|
||||
long nowTs = EnvironmentEdgeManager.currentTime();
|
||||
Result[] results = doGet(connection, gets);
|
||||
|
||||
|
@ -286,16 +288,21 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
parseUserResult(user, results[i], new UserQuotasVisitor() {
|
||||
@Override
|
||||
public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
|
||||
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
|
||||
quotaInfo.setQuotas(namespace, quotas);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
|
||||
quotas = updateClusterQuotaToMachineQuota(quotas,
|
||||
tableMachineQuotaFactors.containsKey(table) ? tableMachineQuotaFactors.get(table)
|
||||
: 1);
|
||||
quotaInfo.setQuotas(table, quotas);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visitUserQuotas(String userName, Quotas quotas) {
|
||||
quotas = updateClusterQuotaToMachineQuota(quotas, factor);
|
||||
quotaInfo.setQuotas(quotas);
|
||||
}
|
||||
});
|
||||
|
@ -308,24 +315,34 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
}
|
||||
|
||||
public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
|
||||
final List<Get> gets) throws IOException {
|
||||
final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
|
||||
return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
|
||||
@Override
|
||||
public TableName getKeyFromRow(final byte[] row) {
|
||||
assert isTableRowKey(row);
|
||||
return getTableFromRowKey(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getFactor(TableName tableName) {
|
||||
return tableMachineFactors.containsKey(tableName) ? tableMachineFactors.get(tableName) : 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static Map<String, QuotaState> fetchNamespaceQuotas(final Connection connection,
|
||||
final List<Get> gets) throws IOException {
|
||||
final List<Get> gets, double factor) throws IOException {
|
||||
return fetchGlobalQuotas("namespace", connection, gets, new KeyFromRow<String>() {
|
||||
@Override
|
||||
public String getKeyFromRow(final byte[] row) {
|
||||
assert isNamespaceRowKey(row);
|
||||
return getNamespaceFromRowKey(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getFactor(String s) {
|
||||
return factor;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -337,6 +354,11 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
assert isRegionServerRowKey(row);
|
||||
return getRegionServerFromRowKey(row);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getFactor(String s) {
|
||||
return 1;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -362,6 +384,8 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
|
||||
try {
|
||||
Quotas quotas = quotasFromData(data);
|
||||
quotas = updateClusterQuotaToMachineQuota(quotas,
|
||||
kfr.getFactor(key));
|
||||
quotaInfo.setQuotas(quotas);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
|
||||
|
@ -371,8 +395,62 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
return globalQuotas;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert cluster scope quota to machine scope quota
|
||||
* @param quotas the original quota
|
||||
* @param factor factor used to divide cluster limiter to machine limiter
|
||||
* @return the converted quota whose quota limiters all in machine scope
|
||||
*/
|
||||
private static Quotas updateClusterQuotaToMachineQuota(Quotas quotas, double factor) {
|
||||
Quotas.Builder newQuotas = Quotas.newBuilder(quotas);
|
||||
if (newQuotas.hasThrottle()) {
|
||||
Throttle.Builder throttle = Throttle.newBuilder(newQuotas.getThrottle());
|
||||
if (throttle.hasReqNum()) {
|
||||
throttle.setReqNum(updateTimedQuota(throttle.getReqNum(), factor));
|
||||
}
|
||||
if (throttle.hasReqSize()) {
|
||||
throttle.setReqSize(updateTimedQuota(throttle.getReqSize(), factor));
|
||||
}
|
||||
if (throttle.hasReadNum()) {
|
||||
throttle.setReadNum(updateTimedQuota(throttle.getReadNum(), factor));
|
||||
}
|
||||
if (throttle.hasReadSize()) {
|
||||
throttle.setReadSize(updateTimedQuota(throttle.getReadSize(), factor));
|
||||
}
|
||||
if (throttle.hasWriteNum()) {
|
||||
throttle.setWriteNum(updateTimedQuota(throttle.getWriteNum(), factor));
|
||||
}
|
||||
if (throttle.hasWriteSize()) {
|
||||
throttle.setWriteSize(updateTimedQuota(throttle.getWriteSize(), factor));
|
||||
}
|
||||
if (throttle.hasReqCapacityUnit()) {
|
||||
throttle.setReqCapacityUnit(updateTimedQuota(throttle.getReqCapacityUnit(), factor));
|
||||
}
|
||||
if (throttle.hasReadCapacityUnit()) {
|
||||
throttle.setReadCapacityUnit(updateTimedQuota(throttle.getReadCapacityUnit(), factor));
|
||||
}
|
||||
if (throttle.hasWriteCapacityUnit()) {
|
||||
throttle.setWriteCapacityUnit(updateTimedQuota(throttle.getWriteCapacityUnit(), factor));
|
||||
}
|
||||
newQuotas.setThrottle(throttle.build());
|
||||
}
|
||||
return newQuotas.build();
|
||||
}
|
||||
|
||||
private static TimedQuota updateTimedQuota(TimedQuota timedQuota, double factor) {
|
||||
if (timedQuota.getScope() == QuotaScope.CLUSTER) {
|
||||
TimedQuota.Builder newTimedQuota = TimedQuota.newBuilder(timedQuota);
|
||||
newTimedQuota.setSoftLimit(Math.max(1, (long) (timedQuota.getSoftLimit() * factor)))
|
||||
.setScope(QuotaScope.MACHINE);
|
||||
return newTimedQuota.build();
|
||||
} else {
|
||||
return timedQuota;
|
||||
}
|
||||
}
|
||||
|
||||
private static interface KeyFromRow<T> {
|
||||
T getKeyFromRow(final byte[] row);
|
||||
double getFactor(T t);
|
||||
}
|
||||
|
||||
/* =========================================================================
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerNamespaceCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerTableCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ RegionServerTests.class, LargeTests.class })
|
||||
public class TestClusterScopeQuotaThrottle {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestClusterScopeQuotaThrottle.class);
|
||||
|
||||
private final static int REFRESH_TIME = 30 * 60000;
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
private final static TableName[] TABLE_NAMES =
|
||||
new TableName[] { TableName.valueOf("TestQuotaAdmin0"), TableName.valueOf("TestQuotaAdmin1"),
|
||||
TableName.valueOf("TestQuotaAdmin2") };
|
||||
private final static byte[] FAMILY = Bytes.toBytes("cf");
|
||||
private final static byte[] QUALIFIER = Bytes.toBytes("q");
|
||||
private final static byte[][] SPLITS = new byte[][] { Bytes.toBytes("1") };
|
||||
private static Table[] tables;
|
||||
|
||||
private final static String NAMESPACE = "TestNs";
|
||||
private final static TableName TABLE_NAME = TableName.valueOf(NAMESPACE, "TestTable");
|
||||
private static Table table;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
|
||||
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||
TEST_UTIL.startMiniCluster(2);
|
||||
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
|
||||
QuotaCache.TEST_FORCE_REFRESH = true;
|
||||
|
||||
tables = new Table[TABLE_NAMES.length];
|
||||
for (int i = 0; i < TABLE_NAMES.length; ++i) {
|
||||
tables[i] = TEST_UTIL.createTable(TABLE_NAMES[i], FAMILY);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAMES[i]);
|
||||
}
|
||||
TEST_UTIL.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
|
||||
table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, SPLITS);
|
||||
TEST_UTIL.waitTableAvailable(TABLE_NAME);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
EnvironmentEdgeManager.reset();
|
||||
for (int i = 0; i < tables.length; ++i) {
|
||||
if (tables[i] != null) {
|
||||
tables[i].close();
|
||||
TEST_UTIL.deleteTable(TABLE_NAMES[i]);
|
||||
}
|
||||
}
|
||||
TEST_UTIL.deleteTable(TABLE_NAME);
|
||||
TEST_UTIL.getAdmin().deleteNamespace(NAMESPACE);
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamespaceClusterScopeQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 10req/min limit for write request in cluster scope
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.WRITE_NUMBER, 10,
|
||||
TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
// Add 6req/min limit for read request in machine scope
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.READ_NUMBER, 6,
|
||||
TimeUnit.MINUTES, QuotaScope.MACHINE));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
// should execute at max 5 write requests and at max 3 read requests
|
||||
assertEquals(5, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(6, doGets(10, tables[0]));
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(NAMESPACE));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableClusterScopeQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAME, ThrottleType.READ_NUMBER, 20,
|
||||
TimeUnit.HOURS, QuotaScope.CLUSTER));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAME);
|
||||
for (RegionServerThread rst : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
for (TableName tableName : rst.getRegionServer().getOnlineTables()) {
|
||||
if (tableName.getNameAsString().equals(TABLE_NAME.getNameAsString())) {
|
||||
int rsRegionNum = rst.getRegionServer().getRegions(tableName).size();
|
||||
if (rsRegionNum == 0) {
|
||||
// If rs has 0 region, the machine limiter is 0 (20 * 0 / 2)
|
||||
break;
|
||||
} else if (rsRegionNum == 1) {
|
||||
// If rs has 1 region, the machine limiter is 10 (20 * 1 / 2)
|
||||
// Read rows from 1 region, so can read 10 first time and 0 second time
|
||||
long count = doGets(20, table);
|
||||
assertTrue(count == 0 || count == 10);
|
||||
} else if (rsRegionNum == 2) {
|
||||
// If rs has 2 regions, the machine limiter is 20 (20 * 2 / 2)
|
||||
assertEquals(20, doGets(20, table));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAME));
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAME);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserClusterScopeQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit for read request in cluster scope
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_NUMBER, 6,
|
||||
TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
// Add 6req/min limit for write request in machine scope
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.throttleUser(userName, ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES);
|
||||
// should execute at max 6 read requests and at max 3 write write requests
|
||||
assertEquals(6, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(3, doGets(10, tables[0]));
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserNamespaceClusterScopeQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final String userName = User.getCurrent().getShortName();
|
||||
final String namespace = TABLE_NAMES[0].getNamespaceAsString();
|
||||
|
||||
// Add 10req/min limit for read request in cluster scope
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, namespace, ThrottleType.READ_NUMBER,
|
||||
10, TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
// Add 6req/min limit for write request in machine scope
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, namespace, ThrottleType.WRITE_NUMBER,
|
||||
6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
// should execute at max 5 read requests and at max 6 write requests
|
||||
assertEquals(5, doGets(10, tables[0]));
|
||||
assertEquals(6, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, namespace));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserTableClusterScopeQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
final String userName = User.getCurrent().getShortName();
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAME, ThrottleType.READ_NUMBER,
|
||||
20, TimeUnit.HOURS, QuotaScope.CLUSTER));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
|
||||
for (RegionServerThread rst : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
for (TableName tableName : rst.getRegionServer().getOnlineTables()) {
|
||||
if (tableName.getNameAsString().equals(TABLE_NAME.getNameAsString())) {
|
||||
int rsRegionNum = rst.getRegionServer().getRegions(tableName).size();
|
||||
if (rsRegionNum == 0) {
|
||||
// If rs has 0 region, the machine limiter is 0 (20 * 0 / 2)
|
||||
break;
|
||||
} else if (rsRegionNum == 1) {
|
||||
// If rs has 1 region, the machine limiter is 10 (20 * 1 / 2)
|
||||
// Read rows from 1 region, so can read 10 first time and 0 second time
|
||||
long count = doGets(20, table);
|
||||
assertTrue(count == 0 || count == 10);
|
||||
} else if (rsRegionNum == 2) {
|
||||
// If rs has 2 regions, the machine limiter is 20 (20 * 2 / 2)
|
||||
assertEquals(20, doGets(20, table));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
|
||||
}
|
||||
}
|
|
@ -639,6 +639,57 @@ public class TestQuotaAdmin {
|
|||
admin.setQuota(QuotaSettingsFactory.unthrottleRegionServer(regionServer));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuotaScope() throws Exception {
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
String user = "user1";
|
||||
String namespace = "testQuotaScope_ns";
|
||||
TableName tableName = TableName.valueOf("testQuotaScope");
|
||||
QuotaFilter filter = new QuotaFilter();
|
||||
|
||||
// set CLUSTER quota scope for namespace
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(namespace, ThrottleType.REQUEST_NUMBER,
|
||||
10, TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
assertNumResults(1, filter);
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.CLUSTER);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(namespace, ThrottleType.REQUEST_NUMBER,
|
||||
10, TimeUnit.MINUTES, QuotaScope.MACHINE));
|
||||
assertNumResults(1, filter);
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.MACHINE);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(namespace));
|
||||
assertNumResults(0, filter);
|
||||
|
||||
// set CLUSTER quota scope for table
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(tableName, ThrottleType.REQUEST_NUMBER, 10,
|
||||
TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.CLUSTER);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(tableName));
|
||||
|
||||
// set CLUSTER quota scope for user
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(user, ThrottleType.REQUEST_NUMBER, 10,
|
||||
TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.CLUSTER);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
|
||||
|
||||
// set CLUSTER quota scope for user and table
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(user, tableName, ThrottleType.REQUEST_NUMBER,
|
||||
10, TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.CLUSTER);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
|
||||
|
||||
// set CLUSTER quota scope for user and namespace
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(user, namespace, ThrottleType.REQUEST_NUMBER,
|
||||
10, TimeUnit.MINUTES, QuotaScope.CLUSTER));
|
||||
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_NUMBER, 10, TimeUnit.MINUTES,
|
||||
QuotaScope.CLUSTER);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
|
||||
}
|
||||
|
||||
private void testSwitchRpcThrottle(Admin admin, boolean oldRpcThrottle, boolean newRpcThrottle)
|
||||
throws IOException {
|
||||
boolean state = admin.switchRpcThrottle(newRpcThrottle);
|
||||
|
@ -651,13 +702,18 @@ public class TestQuotaAdmin {
|
|||
|
||||
private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu)
|
||||
throws Exception {
|
||||
verifyRecordPresentInQuotaTable(type, limit, tu, QuotaScope.MACHINE);
|
||||
}
|
||||
|
||||
private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu,
|
||||
QuotaScope scope) throws Exception {
|
||||
// Verify the RPC Quotas in the table
|
||||
try (Table quotaTable = TEST_UTIL.getConnection().getTable(QuotaTableUtil.QUOTA_TABLE_NAME);
|
||||
ResultScanner scanner = quotaTable.getScanner(new Scan())) {
|
||||
Result r = Iterables.getOnlyElement(scanner);
|
||||
CellScanner cells = r.cellScanner();
|
||||
assertTrue("Expected to find a cell", cells.advance());
|
||||
assertRPCQuota(type, limit, tu, cells.current());
|
||||
assertRPCQuota(type, limit, tu, scope, cells.current());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -684,8 +740,8 @@ public class TestQuotaAdmin {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertRPCQuota(ThrottleType type, long limit, TimeUnit tu, Cell cell)
|
||||
throws Exception {
|
||||
private void assertRPCQuota(ThrottleType type, long limit, TimeUnit tu, QuotaScope scope,
|
||||
Cell cell) throws Exception {
|
||||
Quotas q = QuotaTableUtil
|
||||
.quotasFromData(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
|
||||
assertTrue("Quota should have rpc quota defined", q.hasThrottle());
|
||||
|
@ -733,6 +789,7 @@ public class TestQuotaAdmin {
|
|||
default:
|
||||
}
|
||||
|
||||
assertEquals(scope, ProtobufUtil.toQuotaScope(t.getScope()));
|
||||
assertEquals(t.getSoftLimit(), limit);
|
||||
assertEquals(t.getTimeUnit(), ProtobufUtil.toProtoTimeUnit(tu));
|
||||
}
|
||||
|
|
|
@ -17,9 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerExceedThrottleQuotaCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerNamespaceCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerRegionServerCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerTableCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
|
||||
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -27,16 +34,12 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -68,8 +71,6 @@ public class TestQuotaThrottle {
|
|||
TableName.valueOf("TestQuotaAdmin1"),
|
||||
TableName.valueOf("TestQuotaAdmin2")
|
||||
};
|
||||
|
||||
private static ManualEnvironmentEdge envEdge;
|
||||
private static Table[] tables;
|
||||
|
||||
@BeforeClass
|
||||
|
@ -89,10 +90,6 @@ public class TestQuotaThrottle {
|
|||
for (int i = 0; i < TABLE_NAMES.length; ++i) {
|
||||
tables[i] = TEST_UTIL.createTable(TABLE_NAMES[i], FAMILY);
|
||||
}
|
||||
|
||||
envEdge = new ManualEnvironmentEdge();
|
||||
envEdge.setValue(EnvironmentEdgeManager.currentTime());
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(envEdge);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -110,13 +107,7 @@ public class TestQuotaThrottle {
|
|||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
QuotaCache quotaCache = quotaManager.getQuotaCache();
|
||||
quotaCache.getNamespaceQuotaCache().clear();
|
||||
quotaCache.getTableQuotaCache().clear();
|
||||
quotaCache.getUserQuotaCache().clear();
|
||||
}
|
||||
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -125,21 +116,21 @@ public class TestQuotaThrottle {
|
|||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES);
|
||||
|
||||
// should execute at max 6 requests
|
||||
assertEquals(6, doPuts(100, tables));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
assertEquals(6, doPuts(100, tables));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -149,29 +140,29 @@ public class TestQuotaThrottle {
|
|||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit for read request
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES);
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.throttleUser(userName, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES);
|
||||
|
||||
// not limit for write request and should execute at max 6 read requests
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(6, doGets(100, tables));
|
||||
|
||||
waitMinuteQuota();
|
||||
|
||||
// Add 6req/min limit for write request
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES);
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.throttleUser(userName, ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES);
|
||||
|
||||
// should execute at max 6 read requests and at max 6 write write requests
|
||||
assertEquals(6, doGets(100, tables));
|
||||
assertEquals(6, doPuts(60, tables));
|
||||
assertEquals(6, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -181,22 +172,22 @@ public class TestQuotaThrottle {
|
|||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[0],
|
||||
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 requests on tables[0] and have no limit on tables[1]
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(30, doPuts(30, tables[1]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(30, doPuts(30, FAMILY, QUALIFIER, tables[1]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, TABLE_NAMES[0]));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -206,38 +197,38 @@ public class TestQuotaThrottle {
|
|||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit for write request on tables[0]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[0],
|
||||
ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 write requests and have no limit for read request
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(60, doGets(60, tables[0]));
|
||||
|
||||
// no limit on tables[1]
|
||||
assertEquals(60, doPuts(60, tables[1]));
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables[1]));
|
||||
assertEquals(60, doGets(60, tables[1]));
|
||||
|
||||
// wait a minute and you should get other 6 write requests executed
|
||||
// wait a minute and you should get other 6 write requests executed
|
||||
waitMinuteQuota();
|
||||
|
||||
// Add 6req/min limit for read request on tables[0]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, TABLE_NAMES[0], ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[0],
|
||||
ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 read requests and at max 6 write requests
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(6, doGets(60, tables[0]));
|
||||
|
||||
// no limit on tables[1]
|
||||
assertEquals(30, doPuts(30, tables[1]));
|
||||
assertEquals(30, doPuts(30, FAMILY, QUALIFIER, tables[1]));
|
||||
assertEquals(30, doGets(30, tables[1]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, TABLE_NAMES[0]));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -248,21 +239,21 @@ public class TestQuotaThrottle {
|
|||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 6req/min limit
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, NAMESPACE, ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, NAMESPACE,
|
||||
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 requests on tables[0] and have no limit on tables[1]
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
assertEquals(6, doPuts(100, tables[1]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[1]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, NAMESPACE));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -273,29 +264,29 @@ public class TestQuotaThrottle {
|
|||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 6req/min limit for read request
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, NAMESPACE, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, NAMESPACE, ThrottleType.READ_NUMBER,
|
||||
6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 read requests and have no limit for write request
|
||||
assertEquals(6, doGets(60, tables[0]));
|
||||
assertEquals(60, doPuts(60, tables[0]));
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
waitMinuteQuota();
|
||||
|
||||
// Add 6req/min limit for write request, too
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, NAMESPACE, ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, NAMESPACE, ThrottleType.WRITE_NUMBER,
|
||||
6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 read requests and at max 6 write requests
|
||||
assertEquals(6, doGets(60, tables[0]));
|
||||
assertEquals(6, doPuts(60, tables[0]));
|
||||
assertEquals(6, doPuts(60, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, NAMESPACE));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, tables));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES);
|
||||
assertEquals(60, doPuts(60, FAMILY, QUALIFIER, tables));
|
||||
assertEquals(60, doGets(60, tables));
|
||||
}
|
||||
|
||||
|
@ -304,22 +295,22 @@ public class TestQuotaThrottle {
|
|||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
// Add 6req/min limit
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER,
|
||||
6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 requests
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
// should have no limits
|
||||
assertEquals(30, doPuts(30, tables[1]));
|
||||
assertEquals(30, doPuts(30, FAMILY, QUALIFIER, tables[1]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
assertEquals(80, doGets(80, tables[0], tables[1]));
|
||||
}
|
||||
|
||||
|
@ -328,35 +319,35 @@ public class TestQuotaThrottle {
|
|||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
// Add 6req/min limit for read request
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[0], ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.READ_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 read requests and have no limit for write request
|
||||
assertEquals(6, doGets(100, tables[0]));
|
||||
assertEquals(100, doPuts(100, tables[0]));
|
||||
assertEquals(100, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
// should have no limits on tables[1]
|
||||
assertEquals(30, doPuts(30, tables[1]));
|
||||
assertEquals(30, doPuts(30, FAMILY, QUALIFIER, tables[1]));
|
||||
assertEquals(30, doGets(30, tables[1]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
|
||||
// Add 6req/min limit for write request, too
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 read requests and at max 6 write requests
|
||||
assertEquals(6, doGets(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
// should have no limits on tables[1]
|
||||
assertEquals(30, doPuts(30, tables[1]));
|
||||
assertEquals(30, doPuts(30, FAMILY, QUALIFIER, tables[1]));
|
||||
assertEquals(30, doGets(30, tables[1]));
|
||||
|
||||
// Remove all the limits
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
assertEquals(80, doGets(80, tables[0], tables[1]));
|
||||
}
|
||||
|
||||
|
@ -366,20 +357,20 @@ public class TestQuotaThrottle {
|
|||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 6req/min limit
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleNamespace(NAMESPACE, ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.REQUEST_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 requests
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
assertEquals(6, doPuts(100, tables[1]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[1]));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(NAMESPACE));
|
||||
triggerNamespaceCacheRefresh(true, TABLE_NAMES[0]);
|
||||
assertEquals(40, doPuts(40, tables[0]));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
assertEquals(40, doPuts(40, FAMILY, QUALIFIER, tables[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -388,29 +379,29 @@ public class TestQuotaThrottle {
|
|||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 6req/min limit for write request
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleNamespace(NAMESPACE, ThrottleType.WRITE_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.WRITE_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 write requests and no limit for read request
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(100, doGets(100, tables[0]));
|
||||
|
||||
// wait a minute and you should get other 6 requests executed
|
||||
waitMinuteQuota();
|
||||
|
||||
// Add 6req/min limit for read request, too
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleNamespace(NAMESPACE, ThrottleType.READ_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.READ_NUMBER, 6,
|
||||
TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 write requests and at max 6 read requests
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(6, doGets(100, tables[0]));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(NAMESPACE));
|
||||
triggerNamespaceCacheRefresh(true, TABLE_NAMES[0]);
|
||||
assertEquals(40, doPuts(40, tables[0]));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
assertEquals(40, doPuts(40, FAMILY, QUALIFIER, tables[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -419,21 +410,21 @@ public class TestQuotaThrottle {
|
|||
final String userName = User.getCurrent().getShortName();
|
||||
|
||||
// Add 6req/min limit for the user on tables[0]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[0],
|
||||
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
// Add 12req/min limit for the user
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 12, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[1], TABLE_NAMES[2]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_NUMBER, 12,
|
||||
TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[1], TABLE_NAMES[2]);
|
||||
// Add 8req/min limit for the tables[1]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[1], ThrottleType.REQUEST_NUMBER, 8, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[1]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[1], ThrottleType.REQUEST_NUMBER,
|
||||
8, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[1]);
|
||||
// Add a lower table level throttle on tables[0]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER, 3, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER,
|
||||
3, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 12 requests
|
||||
assertEquals(12, doGets(100, tables[2]));
|
||||
|
@ -444,20 +435,20 @@ public class TestQuotaThrottle {
|
|||
|
||||
// should execute at max 3 requests
|
||||
waitMinuteQuota();
|
||||
assertEquals(3, doPuts(100, tables[0]));
|
||||
assertEquals(3, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// Remove all the throttling rules
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, TABLE_NAMES[0]));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES[0], TABLE_NAMES[1]);
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0], TABLE_NAMES[1]);
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[1]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[1]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[1]);
|
||||
waitMinuteQuota();
|
||||
assertEquals(40, doGets(40, tables[1]));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
waitMinuteQuota();
|
||||
assertEquals(40, doGets(40, tables[0]));
|
||||
}
|
||||
|
@ -469,24 +460,24 @@ public class TestQuotaThrottle {
|
|||
final String NAMESPACE = "default";
|
||||
|
||||
// Add 6req/min limit for tables[0]
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER,
|
||||
6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
// Add 13req/min limit for the user
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleNamespace(NAMESPACE, ThrottleType.REQUEST_NUMBER, 13, TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(false, TABLE_NAMES[1]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleNamespace(NAMESPACE, ThrottleType.REQUEST_NUMBER,
|
||||
13, TimeUnit.MINUTES));
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, false, TABLE_NAMES[1]);
|
||||
|
||||
// should execute at max 6 requests on table[0] and (13 - 6) on table[1]
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(7, doGets(100, tables[1]));
|
||||
waitMinuteQuota();
|
||||
|
||||
// Set the global bypass for the user
|
||||
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
|
||||
admin.setQuota(QuotaSettingsFactory
|
||||
.throttleUser(userName, TABLE_NAMES[2], ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(false, TABLE_NAMES[2]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleUser(userName, TABLE_NAMES[2],
|
||||
ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
|
||||
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAMES[2]);
|
||||
assertEquals(30, doGets(30, tables[0]));
|
||||
assertEquals(30, doGets(30, tables[1]));
|
||||
waitMinuteQuota();
|
||||
|
@ -495,16 +486,16 @@ public class TestQuotaThrottle {
|
|||
// should execute at max 6 requests on table[0] and (13 - 6) on table[1]
|
||||
admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName, TABLE_NAMES[2]));
|
||||
triggerUserCacheRefresh(true, TABLE_NAMES[2]);
|
||||
assertEquals(6, doPuts(100, tables[0]));
|
||||
triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAMES[2]);
|
||||
assertEquals(6, doPuts(100, FAMILY, QUALIFIER, tables[0]));
|
||||
assertEquals(7, doGets(100, tables[1]));
|
||||
|
||||
// unset throttle
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(NAMESPACE));
|
||||
waitMinuteQuota();
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerNamespaceCacheRefresh(true, TABLE_NAMES[1]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
triggerNamespaceCacheRefresh(TEST_UTIL, true, TABLE_NAMES[1]);
|
||||
assertEquals(30, doGets(30, tables[0]));
|
||||
assertEquals(30, doGets(30, tables[1]));
|
||||
}
|
||||
|
@ -516,18 +507,18 @@ public class TestQuotaThrottle {
|
|||
// Add 6CU/min limit
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
|
||||
ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
// should execute at max 6 capacity units because each put size is 1 capacity unit
|
||||
assertEquals(6, doPuts(20, 10, tables[0]));
|
||||
assertEquals(6, doPuts(20, 10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// wait a minute and you should execute at max 3 capacity units because each put size is 2
|
||||
// capacity unit
|
||||
waitMinuteQuota();
|
||||
assertEquals(3, doPuts(20, 1025, tables[0]));
|
||||
assertEquals(3, doPuts(20, 1025, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -537,20 +528,20 @@ public class TestQuotaThrottle {
|
|||
// Add 6CU/min limit
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
|
||||
ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
assertEquals(20, doPuts(20, 10, tables[0]));
|
||||
assertEquals(20, doPuts(20, 10, FAMILY, QUALIFIER, tables[0]));
|
||||
// should execute at max 6 capacity units because each get size is 1 capacity unit
|
||||
assertEquals(6, doGets(20, tables[0]));
|
||||
|
||||
assertEquals(20, doPuts(20, 2015, tables[0]));
|
||||
assertEquals(20, doPuts(20, 2015, FAMILY, QUALIFIER, tables[0]));
|
||||
// wait a minute and you should execute at max 3 capacity units because each get size is 2
|
||||
// capacity unit on tables[0]
|
||||
waitMinuteQuota();
|
||||
assertEquals(3, doGets(20, tables[0]));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -558,16 +549,16 @@ public class TestQuotaThrottle {
|
|||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
// Add throttle quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
|
||||
ThrottleType.REQUEST_NUMBER, 100, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.REQUEST_NUMBER,
|
||||
100, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
|
||||
Table table = TEST_UTIL.getConnection().getTable(TABLE_NAMES[0]);
|
||||
// An exists call when having throttle quota
|
||||
table.exists(new Get(Bytes.toBytes("abc")));
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -579,22 +570,27 @@ public class TestQuotaThrottle {
|
|||
// requests are throttled by table quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 7, TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
assertEquals(5, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
assertEquals(5, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// requests are throttled by region server quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 4, TimeUnit.MINUTES));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
assertEquals(4, doPuts(10, tables[0]));
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
assertEquals(4, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
assertEquals(4, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// unthrottle
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.unthrottleRegionServer(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, true);
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(true);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -602,195 +598,57 @@ public class TestQuotaThrottle {
|
|||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 5,
|
||||
TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(TEST_UTIL, false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 20, TimeUnit.SECONDS));
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.READ_NUMBER, 10, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
|
||||
// enable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
// exceed table limit and allowed by region server limit
|
||||
triggerExceedThrottleQuotaCacheRefresh(true);
|
||||
triggerExceedThrottleQuotaCacheRefresh(TEST_UTIL, true);
|
||||
waitMinuteQuota();
|
||||
assertEquals(10, doPuts(10, tables[0]));
|
||||
assertEquals(10, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
// exceed table limit and throttled by region server limit
|
||||
waitMinuteQuota();
|
||||
assertEquals(20, doPuts(25, tables[0]));
|
||||
assertEquals(20, doPuts(25, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// set region server limiter is lower than table limiter
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 2, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
// throttled by region server limiter
|
||||
waitMinuteQuota();
|
||||
assertEquals(2, doPuts(10, tables[0]));
|
||||
assertEquals(2, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 20, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, false);
|
||||
|
||||
// disable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(TEST_UTIL, false);
|
||||
waitMinuteQuota();
|
||||
// throttled by table limit
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
assertEquals(5, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// enable exceed throttle quota and unthrottle region server
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
triggerExceedThrottleQuotaCacheRefresh(true);
|
||||
triggerExceedThrottleQuotaCacheRefresh(TEST_UTIL, true);
|
||||
waitMinuteQuota();
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.unthrottleRegionServer(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
triggerRegionServerCacheRefresh(true);
|
||||
triggerRegionServerCacheRefresh(TEST_UTIL, true);
|
||||
waitMinuteQuota();
|
||||
// throttled by table limit
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
assertEquals(5, doPuts(10, FAMILY, QUALIFIER, tables[0]));
|
||||
|
||||
// disable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(TEST_UTIL, false);
|
||||
// unthrottle table
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
private int doPuts(int maxOps, final Table... tables) throws Exception {
|
||||
return doPuts(maxOps, -1, tables);
|
||||
}
|
||||
|
||||
private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception {
|
||||
int count = 0;
|
||||
try {
|
||||
while (count < maxOps) {
|
||||
Put put = new Put(Bytes.toBytes("row-" + count));
|
||||
byte[] value;
|
||||
if (valueSize < 0) {
|
||||
value = Bytes.toBytes("data-" + count);
|
||||
} else {
|
||||
value = generateValue(valueSize);
|
||||
}
|
||||
put.addColumn(FAMILY, QUALIFIER, value);
|
||||
for (final Table table : tables) {
|
||||
table.put(put);
|
||||
}
|
||||
count += tables.length;
|
||||
}
|
||||
} catch (RpcThrottlingException e) {
|
||||
LOG.error("put failed after nRetries=" + count, e);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private byte[] generateValue(int valueSize) {
|
||||
byte[] bytes = new byte[valueSize];
|
||||
for (int i = 0; i < valueSize; i++) {
|
||||
bytes[i] = 'a';
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private long doGets(int maxOps, final Table... tables) throws Exception {
|
||||
int count = 0;
|
||||
try {
|
||||
while (count < maxOps) {
|
||||
Get get = new Get(Bytes.toBytes("row-" + count));
|
||||
for (final Table table: tables) {
|
||||
table.get(get);
|
||||
}
|
||||
count += tables.length;
|
||||
}
|
||||
} catch (RpcThrottlingException e) {
|
||||
LOG.error("get failed after nRetries=" + count, e);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private void triggerUserCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, true, false, false, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerTableCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, true, false, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerNamespaceCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, false, true, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerRegionServerCacheRefresh(boolean bypass) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, false, false, true, false);
|
||||
}
|
||||
|
||||
private void triggerExceedThrottleQuotaCacheRefresh(boolean exceedEnabled) throws Exception {
|
||||
triggerCacheRefresh(exceedEnabled, false, false, false, false, true);
|
||||
}
|
||||
|
||||
private void triggerCacheRefresh(boolean bypass, boolean userLimiter, boolean tableLimiter,
|
||||
boolean nsLimiter, boolean rsLimiter, boolean exceedThrottleQuota, final TableName... tables)
|
||||
throws Exception {
|
||||
envEdge.incValue(2 * REFRESH_TIME);
|
||||
for (RegionServerThread rst : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager =
|
||||
rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
QuotaCache quotaCache = quotaManager.getQuotaCache();
|
||||
|
||||
quotaCache.triggerCacheRefresh();
|
||||
// sleep for cache update
|
||||
Thread.sleep(250);
|
||||
|
||||
for (TableName table : tables) {
|
||||
quotaCache.getTableLimiter(table);
|
||||
}
|
||||
|
||||
boolean isUpdated = false;
|
||||
while (!isUpdated) {
|
||||
quotaCache.triggerCacheRefresh();
|
||||
isUpdated = true;
|
||||
for (TableName table : tables) {
|
||||
boolean isBypass = true;
|
||||
if (userLimiter) {
|
||||
isBypass &= quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass();
|
||||
}
|
||||
if (tableLimiter) {
|
||||
isBypass &= quotaCache.getTableLimiter(table).isBypass();
|
||||
}
|
||||
if (nsLimiter) {
|
||||
isBypass &= quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
|
||||
}
|
||||
if (isBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (rsLimiter) {
|
||||
boolean rsIsBypass = quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass();
|
||||
if (rsIsBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (exceedThrottleQuota) {
|
||||
if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("QuotaCache");
|
||||
LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getTableQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getUserQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache()));
|
||||
}
|
||||
}
|
||||
|
||||
private void waitMinuteQuota() {
|
||||
envEdge.incValue(70000);
|
||||
triggerTableCacheRefresh(TEST_UTIL, true, TABLE_NAMES[0]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,207 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public final class ThrottleQuotaTestUtil {
|
||||
|
||||
private final static Logger LOG = LoggerFactory.getLogger(ThrottleQuotaTestUtil.class);
|
||||
private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
|
||||
private final static int REFRESH_TIME = 30 * 60000;
|
||||
static {
|
||||
envEdge.setValue(EnvironmentEdgeManager.currentTime());
|
||||
EnvironmentEdgeManagerTestHelper.injectEdge(envEdge);
|
||||
}
|
||||
|
||||
private ThrottleQuotaTestUtil() {
|
||||
// Hide utility class constructor
|
||||
LOG.debug("Call constructor of ThrottleQuotaTestUtil");
|
||||
}
|
||||
|
||||
static int doPuts(int maxOps, byte[] family, byte[] qualifier, final Table... tables) {
|
||||
return doPuts(maxOps, -1, family, qualifier, tables);
|
||||
}
|
||||
|
||||
static int doPuts(int maxOps, int valueSize, byte[] family, byte[] qualifier,
|
||||
final Table... tables) {
|
||||
int count = 0;
|
||||
try {
|
||||
while (count < maxOps) {
|
||||
Put put = new Put(Bytes.toBytes("row-" + count));
|
||||
byte[] value;
|
||||
if (valueSize < 0) {
|
||||
value = Bytes.toBytes("data-" + count);
|
||||
} else {
|
||||
value = generateValue(valueSize);
|
||||
}
|
||||
put.addColumn(family, qualifier, value);
|
||||
for (final Table table : tables) {
|
||||
table.put(put);
|
||||
}
|
||||
count += tables.length;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("put failed after nRetries=" + count, e);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
private static byte[] generateValue(int valueSize) {
|
||||
byte[] bytes = new byte[valueSize];
|
||||
for (int i = 0; i < valueSize; i++) {
|
||||
bytes[i] = 'a';
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
static long doGets(int maxOps, final Table... tables) {
|
||||
int count = 0;
|
||||
try {
|
||||
while (count < maxOps) {
|
||||
Get get = new Get(Bytes.toBytes("row-" + count));
|
||||
for (final Table table : tables) {
|
||||
table.get(get);
|
||||
}
|
||||
count += tables.length;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("get failed after nRetries=" + count, e);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
static void triggerUserCacheRefresh(HBaseTestingUtility testUtil, boolean bypass,
|
||||
TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, tables);
|
||||
}
|
||||
|
||||
static void triggerTableCacheRefresh(HBaseTestingUtility testUtil, boolean bypass,
|
||||
TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(testUtil, bypass, false, true, false, false, false, tables);
|
||||
}
|
||||
|
||||
static void triggerNamespaceCacheRefresh(HBaseTestingUtility testUtil, boolean bypass,
|
||||
TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(testUtil, bypass, false, false, true, false, false, tables);
|
||||
}
|
||||
|
||||
static void triggerRegionServerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass)
|
||||
throws Exception {
|
||||
triggerCacheRefresh(testUtil, bypass, false, false, false, true, false);
|
||||
}
|
||||
|
||||
static void triggerExceedThrottleQuotaCacheRefresh(HBaseTestingUtility testUtil,
|
||||
boolean exceedEnabled) throws Exception {
|
||||
triggerCacheRefresh(testUtil, exceedEnabled, false, false, false, false, true);
|
||||
}
|
||||
|
||||
private static void triggerCacheRefresh(HBaseTestingUtility testUtil, boolean bypass,
|
||||
boolean userLimiter, boolean tableLimiter, boolean nsLimiter, boolean rsLimiter,
|
||||
boolean exceedThrottleQuota, final TableName... tables) throws Exception {
|
||||
envEdge.incValue(2 * REFRESH_TIME);
|
||||
for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager =
|
||||
rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
QuotaCache quotaCache = quotaManager.getQuotaCache();
|
||||
|
||||
quotaCache.triggerCacheRefresh();
|
||||
// sleep for cache update
|
||||
Thread.sleep(250);
|
||||
|
||||
for (TableName table : tables) {
|
||||
quotaCache.getTableLimiter(table);
|
||||
}
|
||||
|
||||
boolean isUpdated = false;
|
||||
while (!isUpdated) {
|
||||
quotaCache.triggerCacheRefresh();
|
||||
isUpdated = true;
|
||||
for (TableName table : tables) {
|
||||
boolean isBypass = true;
|
||||
if (userLimiter) {
|
||||
isBypass = quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass();
|
||||
}
|
||||
if (tableLimiter) {
|
||||
isBypass &= quotaCache.getTableLimiter(table).isBypass();
|
||||
}
|
||||
if (nsLimiter) {
|
||||
isBypass &= quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
|
||||
}
|
||||
if (isBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (rsLimiter) {
|
||||
boolean rsIsBypass = quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass();
|
||||
if (rsIsBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
}
|
||||
}
|
||||
if (exceedThrottleQuota) {
|
||||
if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("QuotaCache");
|
||||
LOG.debug(Objects.toString(quotaCache.getNamespaceQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getTableQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getUserQuotaCache()));
|
||||
LOG.debug(Objects.toString(quotaCache.getRegionServerQuotaCache()));
|
||||
}
|
||||
}
|
||||
|
||||
static void waitMinuteQuota() {
|
||||
envEdge.incValue(70000);
|
||||
}
|
||||
|
||||
static void clearQuotaCache(HBaseTestingUtility testUtil) {
|
||||
for (RegionServerThread rst : testUtil.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager =
|
||||
rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
QuotaCache quotaCache = quotaManager.getQuotaCache();
|
||||
quotaCache.getNamespaceQuotaCache().clear();
|
||||
quotaCache.getTableQuotaCache().clear();
|
||||
quotaCache.getUserQuotaCache().clear();
|
||||
quotaCache.getRegionServerQuotaCache().clear();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ java_import org.apache.hadoop.hbase.ServerName
|
|||
java_import org.apache.hadoop.hbase.quotas.ThrottleType
|
||||
java_import org.apache.hadoop.hbase.quotas.QuotaFilter
|
||||
java_import org.apache.hadoop.hbase.quotas.QuotaRetriever
|
||||
java_import org.apache.hadoop.hbase.quotas.QuotaScope
|
||||
java_import org.apache.hadoop.hbase.quotas.QuotaSettingsFactory
|
||||
java_import org.apache.hadoop.hbase.quotas.QuotaTableUtil
|
||||
java_import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy
|
||||
|
@ -36,6 +37,9 @@ module HBaseQuotasConstants
|
|||
REQUEST = 'REQUEST'.freeze
|
||||
WRITE = 'WRITE'.freeze
|
||||
READ = 'READ'.freeze
|
||||
SCOPE = 'SCOPE'.freeze
|
||||
CLUSTER = 'CLUSTER'.freeze
|
||||
MACHINE = 'MACHINE'.freeze
|
||||
# Space quota constants
|
||||
SPACE = 'SPACE'.freeze
|
||||
NO_INSERTS = 'NO_INSERTS'.freeze
|
||||
|
@ -60,30 +64,35 @@ module Hbase
|
|||
type = args.fetch(THROTTLE_TYPE, REQUEST)
|
||||
args.delete(THROTTLE_TYPE)
|
||||
type, limit, time_unit = _parse_limit(args.delete(LIMIT), ThrottleType, type)
|
||||
scope = _parse_scope(args.fetch(SCOPE, MACHINE))
|
||||
args.delete(SCOPE)
|
||||
if args.key?(USER)
|
||||
user = args.delete(USER)
|
||||
if args.key?(TABLE)
|
||||
table = TableName.valueOf(args.delete(TABLE))
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleUser(user, table, type, limit, time_unit)
|
||||
settings = QuotaSettingsFactory.throttleUser(user, table, type, limit, time_unit, scope)
|
||||
elsif args.key?(NAMESPACE)
|
||||
namespace = args.delete(NAMESPACE)
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleUser(user, namespace, type, limit, time_unit)
|
||||
settings = QuotaSettingsFactory.throttleUser(user, namespace, type, limit, time_unit, scope)
|
||||
else
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleUser(user, type, limit, time_unit)
|
||||
settings = QuotaSettingsFactory.throttleUser(user, type, limit, time_unit, scope)
|
||||
end
|
||||
elsif args.key?(TABLE)
|
||||
table = TableName.valueOf(args.delete(TABLE))
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleTable(table, type, limit, time_unit)
|
||||
settings = QuotaSettingsFactory.throttleTable(table, type, limit, time_unit, scope)
|
||||
elsif args.key?(NAMESPACE)
|
||||
namespace = args.delete(NAMESPACE)
|
||||
raise(ArgumentError, 'Unexpected arguments: ' + args.inspect) unless args.empty?
|
||||
settings = QuotaSettingsFactory.throttleNamespace(namespace, type, limit, time_unit)
|
||||
settings = QuotaSettingsFactory.throttleNamespace(namespace, type, limit, time_unit, scope)
|
||||
elsif args.key?(REGIONSERVER)
|
||||
# TODO: Setting specified region server quota isn't supported currently and using 'all' for all RS
|
||||
if scope == QuotaScope.valueOf(CLUSTER)
|
||||
raise(ArgumentError, 'Invalid region server throttle scope, must be MACHINE')
|
||||
end
|
||||
settings = QuotaSettingsFactory.throttleRegionServer('all', type, limit, time_unit)
|
||||
else
|
||||
raise 'One of USER, TABLE, NAMESPACE or REGIONSERVER must be specified'
|
||||
|
@ -325,6 +334,13 @@ module Hbase
|
|||
end
|
||||
value
|
||||
end
|
||||
|
||||
def _parse_scope(scope_str)
|
||||
scope_str = scope_str.upcase
|
||||
return QuotaScope.valueOf(scope_str) if [CLUSTER, MACHINE].include?(scope_str)
|
||||
unless raise(ArgumentError, 'Invalid throttle scope, must be either CLUSTER or MACHINE')
|
||||
end
|
||||
end
|
||||
end
|
||||
# rubocop:enable Metrics/ClassLength
|
||||
end
|
||||
|
|
|
@ -26,14 +26,22 @@ Set a quota for a user, table, namespace or region server.
|
|||
Syntax : set_quota TYPE => <type>, <args>
|
||||
|
||||
TYPE => THROTTLE
|
||||
User can either set quota on read, write or on both the requests together(i.e., read+write).
|
||||
The read, write, or read+write(default throttle type) request limit can be expressed using
|
||||
the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
|
||||
can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
|
||||
; the read, write, read+write(default throttle type) limit can be expressed using the form
|
||||
100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
|
||||
Currently the throttle limit is per machine - a limit of 100req/min
|
||||
means that each machine can execute 100req/min.
|
||||
1. User can set throttle quota for user, namespace, table, region server, user over namespace,
|
||||
user over table by USER, NAMESPACE, TABLE, REGIONSERVER keys.
|
||||
Note: Setting specified region server quota isn't supported currently and using 'all' to
|
||||
represent all region servers.
|
||||
2. User can set throttle quota type either on read, write or on both the requests together(
|
||||
read+write, default throttle type) by THROTTLE_TYPE => READ, WRITE, REQUEST.
|
||||
3. The request limit can be expressed using the form 100req/sec, 100req/min; or can be expressed
|
||||
using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit; or can be expressed
|
||||
using the form 100CU/sec as capacity unit by LIMIT key.
|
||||
The valid time units are (sec, min, hour, day).
|
||||
4. User can set throttle scope to be either MACHINE(default throttle scope) or CLUSTER by
|
||||
SCOPE => MACHINE, CLUSTER. MACHINE scope quota means the throttle limit is used by single
|
||||
region server, CLUSTER scope quota means the throttle limit is shared by all region servers.
|
||||
Region server throttle quota must be MACHINE scope.
|
||||
Note: because currently use [ClusterLimit / RsNum] to divide cluster limit to machine limit,
|
||||
so it's better to do not use cluster scope quota when you use rs group feature.
|
||||
|
||||
For example:
|
||||
|
||||
|
@ -59,6 +67,9 @@ For example:
|
|||
hbase> set_quota TYPE => THROTTLE, REGIONSERVER => 'all', THROTTLE_TYPE => WRITE, LIMIT => '20000req/sec'
|
||||
hbase> set_quota TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => NONE
|
||||
|
||||
hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec', SCOPE => CLUSTER
|
||||
hbase> set_quota TYPE => THROTTLE, NAMESPACE => 'ns1', LIMIT => '10req/sec', SCOPE => MACHINE
|
||||
|
||||
hbase> set_quota USER => 'u1', GLOBAL_BYPASS => true
|
||||
|
||||
TYPE => SPACE
|
||||
|
|
|
@ -282,6 +282,23 @@ module Hbase
|
|||
assert(output.include?('Previous exceed throttle quota enabled : true'))
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => NONE)
|
||||
end
|
||||
|
||||
define_test 'can set and remove CLUSTER scope quota' do
|
||||
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => '100req/sec', SCOPE => CLUSTER)
|
||||
output = capture_stdout { command(:list_quotas) }
|
||||
assert(output.include?('LIMIT => 100req/sec'))
|
||||
assert(output.include?('SCOPE => CLUSTER'))
|
||||
|
||||
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => '200req/sec', SCOPE => MACHINE)
|
||||
output = capture_stdout { command(:list_quotas) }
|
||||
assert(output.include?('1 row(s)'))
|
||||
assert(output.include?('LIMIT => 200req/sec'))
|
||||
assert(output.include?('SCOPE => MACHINE'))
|
||||
|
||||
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE)
|
||||
output = capture_stdout { command(:list_quotas) }
|
||||
assert(output.include?('0 row(s)'))
|
||||
end
|
||||
end
|
||||
# rubocop:enable Metrics/ClassLength
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue