HBASE-21783 Support exceed user/table/ns throttle quota if region server has available quota
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
761aef6d9d
commit
9e45752d6e
|
@ -2773,6 +2773,14 @@ public interface Admin extends Abortable, Closeable {
|
|||
*/
|
||||
boolean isRpcThrottleEnabled() throws IOException;
|
||||
|
||||
/**
|
||||
* Switch the exceed throttle quota. If enabled, user/table/namespace throttle quota
|
||||
* can be exceeded if region server has availble quota.
|
||||
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @return Previous exceed throttle enabled value
|
||||
*/
|
||||
boolean exceedThrottleQuotaSwitch(final boolean enable) throws IOException;
|
||||
|
||||
/**
|
||||
* Fetches the table sizes on the filesystem as tracked by the HBase Master.
|
||||
*/
|
||||
|
|
|
@ -1270,6 +1270,14 @@ public interface AsyncAdmin {
|
|||
*/
|
||||
CompletableFuture<Boolean> isRpcThrottleEnabled();
|
||||
|
||||
/**
|
||||
* Switch the exceed throttle quota. If enabled, user/table/namespace throttle quota
|
||||
* can be exceeded if region server has availble quota.
|
||||
* @param enable Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @return Previous exceed throttle enabled value
|
||||
*/
|
||||
CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable);
|
||||
|
||||
/**
|
||||
* Fetches the table sizes on the filesystem as tracked by the HBase Master.
|
||||
*/
|
||||
|
|
|
@ -763,6 +763,11 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
return wrap(rawAdmin.isRpcThrottleEnabled());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
|
||||
return wrap(rawAdmin.exceedThrottleQuotaSwitch(enable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
|
||||
return wrap(rawAdmin.getSpaceQuotaTableSizes());
|
||||
|
|
|
@ -115,6 +115,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCa
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SecurityCapabilitiesResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetNormalizerRunningResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetQuotaStatesRequest;
|
||||
|
@ -1767,6 +1769,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return stub.isRpcThrottleEnabled(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
|
||||
SwitchExceedThrottleQuotaRequest request) throws ServiceException {
|
||||
return stub.switchExceedThrottleQuota(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AccessControlProtos.GrantResponse grant(RpcController controller,
|
||||
AccessControlProtos.GrantRequest request) throws ServiceException {
|
||||
|
|
|
@ -4365,6 +4365,20 @@ public class HBaseAdmin implements Admin {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exceedThrottleQuotaSwitch(final boolean enable) throws IOException {
|
||||
return executeCallable(new MasterCallable<Boolean>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Boolean rpcCall() throws Exception {
|
||||
return this.master
|
||||
.switchExceedThrottleQuota(getRpcController(),
|
||||
MasterProtos.SwitchExceedThrottleQuotaRequest.newBuilder()
|
||||
.setExceedThrottleQuotaEnabled(enable).build())
|
||||
.getPreviousExceedThrottleQuotaEnabled();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<TableName, Long> getSpaceQuotaTableSizes() throws IOException {
|
||||
return executeCallable(
|
||||
|
|
|
@ -256,6 +256,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
|
@ -3670,6 +3672,20 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> exceedThrottleQuotaSwitch(boolean enable) {
|
||||
CompletableFuture<Boolean> future = this.<Boolean> newMasterCaller()
|
||||
.action((controller, stub) -> this
|
||||
.<SwitchExceedThrottleQuotaRequest, SwitchExceedThrottleQuotaResponse, Boolean> call(
|
||||
controller, stub,
|
||||
SwitchExceedThrottleQuotaRequest.newBuilder().setExceedThrottleQuotaEnabled(enable)
|
||||
.build(),
|
||||
(s, c, req, done) -> s.switchExceedThrottleQuota(c, req, done),
|
||||
resp -> resp.getPreviousExceedThrottleQuotaEnabled()))
|
||||
.call();
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Map<TableName, Long>> getSpaceQuotaTableSizes() {
|
||||
return this.<Map<TableName, Long>> newMasterCaller().action((controller, stub) -> this
|
||||
|
|
|
@ -153,6 +153,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
|
@ -660,6 +662,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection {
|
|||
return stub.isRpcThrottleEnabled(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
|
||||
SwitchExceedThrottleQuotaRequest request) throws ServiceException {
|
||||
return stub.switchExceedThrottleQuota(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GrantResponse grant(RpcController controller, GrantRequest request)
|
||||
throws ServiceException {
|
||||
|
|
|
@ -98,6 +98,8 @@ public class QuotaTableUtil {
|
|||
protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t.");
|
||||
protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n.");
|
||||
protected static final byte[] QUOTA_REGION_SERVER_ROW_KEY_PREFIX = Bytes.toBytes("r.");
|
||||
private static final byte[] QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY =
|
||||
Bytes.toBytes("exceedThrottleQuota");
|
||||
|
||||
/*
|
||||
* TODO: Setting specified region server quota isn't supported currently and the row key "r.all"
|
||||
|
@ -357,6 +359,11 @@ public class QuotaTableUtil {
|
|||
parseUserResult(result, visitor);
|
||||
} else if (isRegionServerRowKey(row)) {
|
||||
parseRegionServerResult(result, visitor);
|
||||
} else if (isExceedThrottleQuotaRowKey(row)) {
|
||||
// skip exceed throttle quota row key
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Skip exceedThrottleQuota row-key when parse quota result");
|
||||
}
|
||||
} else {
|
||||
LOG.warn("unexpected row-key: " + Bytes.toString(row));
|
||||
}
|
||||
|
@ -696,6 +703,10 @@ public class QuotaTableUtil {
|
|||
return getRowKeyRegEx(QUOTA_REGION_SERVER_ROW_KEY_PREFIX, regionServer);
|
||||
}
|
||||
|
||||
protected static byte[] getExceedThrottleQuotaRowKey() {
|
||||
return QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY;
|
||||
}
|
||||
|
||||
private static String getRowKeyRegEx(final byte[] prefix, final String regex) {
|
||||
return '^' + Pattern.quote(Bytes.toString(prefix)) + regex + '$';
|
||||
}
|
||||
|
@ -722,6 +733,10 @@ public class QuotaTableUtil {
|
|||
return Bytes.startsWith(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX);
|
||||
}
|
||||
|
||||
private static boolean isExceedThrottleQuotaRowKey(final byte[] key) {
|
||||
return Bytes.equals(key, QUOTA_EXCEED_THROTTLE_QUOTA_ROW_KEY);
|
||||
}
|
||||
|
||||
protected static String getRegionServerFromRowKey(final byte[] key) {
|
||||
return Bytes.toString(key, QUOTA_REGION_SERVER_ROW_KEY_PREFIX.length);
|
||||
}
|
||||
|
|
|
@ -652,6 +652,14 @@ message IsRpcThrottleEnabledResponse {
|
|||
required bool rpc_throttle_enabled = 1;
|
||||
}
|
||||
|
||||
message SwitchExceedThrottleQuotaRequest {
|
||||
required bool exceed_throttle_quota_enabled = 1;
|
||||
}
|
||||
|
||||
message SwitchExceedThrottleQuotaResponse {
|
||||
required bool previous_exceed_throttle_quota_enabled = 1;
|
||||
}
|
||||
|
||||
service MasterService {
|
||||
/** Used by the client to get the number of regions that have received the updated schema */
|
||||
rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest)
|
||||
|
@ -1012,6 +1020,10 @@ service MasterService {
|
|||
rpc IsRpcThrottleEnabled (IsRpcThrottleEnabledRequest)
|
||||
returns (IsRpcThrottleEnabledResponse);
|
||||
|
||||
/** Turn the exceed throttle quota on or off */
|
||||
rpc SwitchExceedThrottleQuota (SwitchExceedThrottleQuotaRequest)
|
||||
returns (SwitchExceedThrottleQuotaResponse);
|
||||
|
||||
rpc Grant(GrantRequest) returns (GrantResponse);
|
||||
|
||||
rpc Revoke(RevokeRequest) returns (RevokeResponse);
|
||||
|
|
|
@ -1488,6 +1488,26 @@ public interface MasterObserver {
|
|||
final boolean rpcThrottleEnabled) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before switching exceed throttle quota state.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param enable the exceed throttle quota value
|
||||
*/
|
||||
default void preSwitchExceedThrottleQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
final boolean enable) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called after switching exceed throttle quota state.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
* @param oldValue the previously exceed throttle quota value
|
||||
* @param newValue the newly exceed throttle quota value
|
||||
*/
|
||||
default void postSwitchExceedThrottleQuota(
|
||||
final ObserverContext<MasterCoprocessorEnvironment> ctx, final boolean oldValue,
|
||||
final boolean newValue) throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Called before granting user permissions.
|
||||
* @param ctx the coprocessor instance's environment
|
||||
|
|
|
@ -1742,6 +1742,25 @@ public class MasterCoprocessorHost
|
|||
});
|
||||
}
|
||||
|
||||
public void preSwitchExceedThrottleQuota(boolean enable) throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.preSwitchExceedThrottleQuota(this, enable);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void postSwitchExceedThrottleQuota(final boolean oldValue, final boolean newValue)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
@Override
|
||||
public void call(MasterObserver observer) throws IOException {
|
||||
observer.postSwitchExceedThrottleQuota(this, oldValue, newValue);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void preGrant(UserPermission userPermission, boolean mergeExistingPermissions)
|
||||
throws IOException {
|
||||
execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() {
|
||||
|
|
|
@ -266,6 +266,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest;
|
||||
|
@ -2476,6 +2478,17 @@ public class MasterRpcServices extends RSRpcServices
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SwitchExceedThrottleQuotaResponse switchExceedThrottleQuota(RpcController controller,
|
||||
SwitchExceedThrottleQuotaRequest request) throws ServiceException {
|
||||
try {
|
||||
master.checkInitialized();
|
||||
return master.getMasterQuotaManager().switchExceedThrottleQuota(request);
|
||||
} catch (Exception e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GrantResponse grant(RpcController controller, GrantRequest request)
|
||||
throws ServiceException {
|
||||
|
|
|
@ -34,17 +34,27 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
public class DefaultOperationQuota implements OperationQuota {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
|
||||
|
||||
private final List<QuotaLimiter> limiters;
|
||||
protected final List<QuotaLimiter> limiters;
|
||||
private final long writeCapacityUnit;
|
||||
private final long readCapacityUnit;
|
||||
|
||||
private long writeAvailable = 0;
|
||||
private long readAvailable = 0;
|
||||
private long writeConsumed = 0;
|
||||
private long readConsumed = 0;
|
||||
private long writeCapacityUnitConsumed = 0;
|
||||
private long readCapacityUnitConsumed = 0;
|
||||
// the available read/write quota size in bytes
|
||||
protected long writeAvailable = 0;
|
||||
protected long readAvailable = 0;
|
||||
// estimated quota
|
||||
protected long writeConsumed = 0;
|
||||
protected long readConsumed = 0;
|
||||
protected long writeCapacityUnitConsumed = 0;
|
||||
protected long readCapacityUnitConsumed = 0;
|
||||
// real consumed quota
|
||||
private final long[] operationSize;
|
||||
// difference between estimated quota and real consumed quota used in close method
|
||||
// to adjust quota amount. Also used by ExceedOperationQuota which is a subclass
|
||||
// of DefaultOperationQuota
|
||||
protected long writeDiff = 0;
|
||||
protected long readDiff = 0;
|
||||
protected long writeCapacityUnitDiff = 0;
|
||||
protected long readCapacityUnitDiff = 0;
|
||||
|
||||
public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
|
||||
this(conf, Arrays.asList(limiters));
|
||||
|
@ -69,12 +79,7 @@ public class DefaultOperationQuota implements OperationQuota {
|
|||
|
||||
@Override
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
|
||||
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
|
||||
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
|
||||
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
|
||||
|
||||
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
|
||||
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
|
||||
updateEstimateConsumeQuota(numWrites, numReads, numScans);
|
||||
|
||||
writeAvailable = Long.MAX_VALUE;
|
||||
readAvailable = Long.MAX_VALUE;
|
||||
|
@ -96,12 +101,12 @@ public class DefaultOperationQuota implements OperationQuota {
|
|||
@Override
|
||||
public void close() {
|
||||
// Adjust the quota consumed for the specified operation
|
||||
long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
|
||||
long readDiff = operationSize[OperationType.GET.ordinal()]
|
||||
writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
|
||||
readDiff = operationSize[OperationType.GET.ordinal()]
|
||||
+ operationSize[OperationType.SCAN.ordinal()] - readConsumed;
|
||||
long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
|
||||
writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
|
||||
operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
|
||||
long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
|
||||
readCapacityUnitDiff = calculateReadCapacityUnitDiff(
|
||||
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
|
||||
readConsumed);
|
||||
|
||||
|
@ -140,6 +145,21 @@ public class DefaultOperationQuota implements OperationQuota {
|
|||
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update estimate quota(read/write size/capacityUnits) which will be consumed
|
||||
* @param numWrites the number of write requests
|
||||
* @param numReads the number of read requests
|
||||
* @param numScans the number of scan requests
|
||||
*/
|
||||
protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) {
|
||||
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
|
||||
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
|
||||
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
|
||||
|
||||
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
|
||||
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
|
||||
}
|
||||
|
||||
private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
|
||||
if (numReqs > 0) {
|
||||
return avgSize * numReqs;
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/*
|
||||
* Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed
|
||||
* throttle quota means, user can over consume user/namespace/table quota if region server has
|
||||
* additional available quota because other users don't consume at the same time.
|
||||
*
|
||||
* There are some limits when enable exceed throttle quota:
|
||||
* 1. Must set at least one read and one write region server throttle quota;
|
||||
* 2. All region server throttle quotas must be in seconds time unit. Because once previous requests
|
||||
* exceed their quota and consume region server quota, quota in other time units may be refilled in
|
||||
* a long time, this may affect later requests.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class ExceedOperationQuota extends DefaultOperationQuota {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ExceedOperationQuota.class);
|
||||
private QuotaLimiter regionServerLimiter;
|
||||
|
||||
public ExceedOperationQuota(final Configuration conf, QuotaLimiter regionServerLimiter,
|
||||
final QuotaLimiter... limiters) {
|
||||
super(conf, limiters);
|
||||
this.regionServerLimiter = regionServerLimiter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
|
||||
if (regionServerLimiter.isBypass()) {
|
||||
// If region server limiter is bypass, which means no region server quota is set, check and
|
||||
// throttle by all other quotas. In this condition, exceed throttle quota will not work.
|
||||
LOG.warn("Exceed throttle quota is enabled but no region server quotas found");
|
||||
super.checkQuota(numWrites, numReads, numScans);
|
||||
} else {
|
||||
// 1. Update estimate quota which will be consumed
|
||||
updateEstimateConsumeQuota(numWrites, numReads, numScans);
|
||||
// 2. Check if region server limiter is enough. If not, throw RpcThrottlingException.
|
||||
regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
|
||||
writeCapacityUnitConsumed, readCapacityUnitConsumed);
|
||||
// 3. Check if other limiters are enough. If not, exceed other limiters because region server
|
||||
// limiter is enough.
|
||||
boolean exceed = false;
|
||||
try {
|
||||
super.checkQuota(numWrites, numReads, numScans);
|
||||
} catch (RpcThrottlingException e) {
|
||||
exceed = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, "
|
||||
+ "try use region server quota",
|
||||
numWrites, numReads, numScans);
|
||||
}
|
||||
}
|
||||
// 4. Region server limiter is enough and grab estimated consume quota.
|
||||
readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable());
|
||||
writeAvailable = Math.max(writeAvailable, regionServerLimiter.getWriteAvailable());
|
||||
regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
|
||||
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
|
||||
if (exceed) {
|
||||
// 5. Other quota limiter is exceeded and has not been grabbed (because throw
|
||||
// RpcThrottlingException in Step 3), so grab it.
|
||||
for (final QuotaLimiter limiter : limiters) {
|
||||
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
|
||||
writeCapacityUnitConsumed, writeCapacityUnitConsumed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
super.close();
|
||||
if (writeDiff != 0) {
|
||||
regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
|
||||
}
|
||||
if (readDiff != 0) {
|
||||
regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -50,6 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrot
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.IsRpcThrottleEnabledResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchExceedThrottleQuotaResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse;
|
||||
|
||||
|
@ -291,7 +293,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
QuotaUtil.addNamespaceQuota(masterServices.getConnection(), namespace,
|
||||
((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
|
||||
quotaPojo.toQuotas());
|
||||
}
|
||||
@Override
|
||||
public void delete() throws IOException {
|
||||
|
@ -320,7 +322,7 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
@Override
|
||||
public void update(GlobalQuotaSettingsImpl quotaPojo) throws IOException {
|
||||
QuotaUtil.addRegionServerQuota(masterServices.getConnection(), regionServer,
|
||||
((GlobalQuotaSettingsImpl) quotaPojo).toQuotas());
|
||||
quotaPojo.toQuotas());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -395,6 +397,33 @@ public class MasterQuotaManager implements RegionStateListener {
|
|||
}
|
||||
}
|
||||
|
||||
public SwitchExceedThrottleQuotaResponse
|
||||
switchExceedThrottleQuota(SwitchExceedThrottleQuotaRequest request) throws IOException {
|
||||
boolean enabled = request.getExceedThrottleQuotaEnabled();
|
||||
if (initialized) {
|
||||
masterServices.getMasterCoprocessorHost().preSwitchExceedThrottleQuota(enabled);
|
||||
boolean previousEnabled =
|
||||
QuotaUtil.isExceedThrottleQuotaEnabled(masterServices.getConnection());
|
||||
if (previousEnabled == enabled) {
|
||||
LOG.warn("Skip switch exceed throttle quota to {} because it's the same with old value",
|
||||
enabled);
|
||||
} else {
|
||||
QuotaUtil.switchExceedThrottleQuota(masterServices.getConnection(), enabled);
|
||||
LOG.info("{} switch exceed throttle quota from {} to {}",
|
||||
masterServices.getClientIdAuditPrefix(), previousEnabled, enabled);
|
||||
}
|
||||
SwitchExceedThrottleQuotaResponse response = SwitchExceedThrottleQuotaResponse.newBuilder()
|
||||
.setPreviousExceedThrottleQuotaEnabled(previousEnabled).build();
|
||||
masterServices.getMasterCoprocessorHost().postSwitchExceedThrottleQuota(previousEnabled,
|
||||
enabled);
|
||||
return response;
|
||||
} else {
|
||||
LOG.warn("Skip switch exceed throttle quota to {} because quota is disabled", enabled);
|
||||
return SwitchExceedThrottleQuotaResponse.newBuilder()
|
||||
.setPreviousExceedThrottleQuotaEnabled(false).build();
|
||||
}
|
||||
}
|
||||
|
||||
private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps)
|
||||
throws IOException, InterruptedException {
|
||||
if (req.hasRemoveAll() && req.getRemoveAll() == true) {
|
||||
|
|
|
@ -71,6 +71,7 @@ public class QuotaCache implements Stoppable {
|
|||
private final ConcurrentHashMap<String, UserQuotaState> userQuotaCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, QuotaState> regionServerQuotaCache =
|
||||
new ConcurrentHashMap<>();
|
||||
private volatile boolean exceedThrottleQuotaEnabled = false;
|
||||
private final RegionServerServices rsServices;
|
||||
|
||||
private QuotaRefresherChore refreshChore;
|
||||
|
@ -158,6 +159,10 @@ public class QuotaCache implements Stoppable {
|
|||
return getQuotaState(this.regionServerQuotaCache, regionServer).getGlobalLimiter();
|
||||
}
|
||||
|
||||
protected boolean isExceedThrottleQuotaEnabled() {
|
||||
return exceedThrottleQuotaEnabled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the QuotaState requested. If the quota info is not in cache an empty one will be
|
||||
* returned and the quota request will be enqueued for the next cache refresh.
|
||||
|
@ -227,6 +232,7 @@ public class QuotaCache implements Stoppable {
|
|||
fetchTableQuotaState();
|
||||
fetchUserQuotaState();
|
||||
fetchRegionServerQuotaState();
|
||||
fetchExceedThrottleQuota();
|
||||
lastUpdate = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
|
||||
|
@ -292,6 +298,15 @@ public class QuotaCache implements Stoppable {
|
|||
});
|
||||
}
|
||||
|
||||
private void fetchExceedThrottleQuota() {
|
||||
try {
|
||||
QuotaCache.this.exceedThrottleQuotaEnabled =
|
||||
QuotaUtil.isExceedThrottleQuotaEnabled(rsServices.getConnection());
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to read if exceed throttle quota enabled from quota table", e);
|
||||
}
|
||||
}
|
||||
|
||||
private <K, V extends QuotaState> void fetch(final String type,
|
||||
final ConcurrentHashMap<K, V> quotasMap, final Fetcher<K, V> fetcher) {
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
package org.apache.hadoop.hbase.quotas;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
|
@ -39,12 +41,16 @@ import org.apache.hadoop.hbase.client.Table;
|
|||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
|
||||
|
||||
/**
|
||||
* Helper class to interact with the quota table
|
||||
|
@ -153,6 +159,86 @@ public class QuotaUtil extends QuotaTableUtil {
|
|||
deleteQuotas(connection, getRegionServerRowKey(regionServer));
|
||||
}
|
||||
|
||||
protected static void switchExceedThrottleQuota(final Connection connection,
|
||||
boolean exceedThrottleQuotaEnabled) throws IOException {
|
||||
if (exceedThrottleQuotaEnabled) {
|
||||
checkRSQuotaToEnableExceedThrottle(
|
||||
getRegionServerQuota(connection, QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
}
|
||||
|
||||
Put put = new Put(getExceedThrottleQuotaRowKey());
|
||||
put.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS,
|
||||
Bytes.toBytes(exceedThrottleQuotaEnabled));
|
||||
doPut(connection, put);
|
||||
}
|
||||
|
||||
private static void checkRSQuotaToEnableExceedThrottle(Quotas quotas) throws IOException {
|
||||
if (quotas != null && quotas.hasThrottle()) {
|
||||
Throttle throttle = quotas.getThrottle();
|
||||
// If enable exceed throttle quota, make sure that there are at least one read(req/read +
|
||||
// num/size/cu) and one write(req/write + num/size/cu) region server throttle quotas.
|
||||
boolean hasReadQuota = false;
|
||||
boolean hasWriteQuota = false;
|
||||
if (throttle.hasReqNum() || throttle.hasReqSize() || throttle.hasReqCapacityUnit()) {
|
||||
hasReadQuota = true;
|
||||
hasWriteQuota = true;
|
||||
}
|
||||
if (!hasReadQuota
|
||||
&& (throttle.hasReadNum() || throttle.hasReadSize() || throttle.hasReadCapacityUnit())) {
|
||||
hasReadQuota = true;
|
||||
}
|
||||
if (!hasReadQuota) {
|
||||
throw new DoNotRetryIOException(
|
||||
"Please set at least one read region server quota before enable exceed throttle quota");
|
||||
}
|
||||
if (!hasWriteQuota && (throttle.hasWriteNum() || throttle.hasWriteSize()
|
||||
|| throttle.hasWriteCapacityUnit())) {
|
||||
hasWriteQuota = true;
|
||||
}
|
||||
if (!hasWriteQuota) {
|
||||
throw new DoNotRetryIOException("Please set at least one write region server quota "
|
||||
+ "before enable exceed throttle quota");
|
||||
}
|
||||
// If enable exceed throttle quota, make sure that region server throttle quotas are in
|
||||
// seconds time unit. Because once previous requests exceed their quota and consume region
|
||||
// server quota, quota in other time units may be refilled in a long time, this may affect
|
||||
// later requests.
|
||||
List<Pair<Boolean, TimedQuota>> list =
|
||||
Arrays.asList(Pair.newPair(throttle.hasReqNum(), throttle.getReqNum()),
|
||||
Pair.newPair(throttle.hasReadNum(), throttle.getReadNum()),
|
||||
Pair.newPair(throttle.hasWriteNum(), throttle.getWriteNum()),
|
||||
Pair.newPair(throttle.hasReqSize(), throttle.getReqSize()),
|
||||
Pair.newPair(throttle.hasReadSize(), throttle.getReadSize()),
|
||||
Pair.newPair(throttle.hasWriteSize(), throttle.getWriteSize()),
|
||||
Pair.newPair(throttle.hasReqCapacityUnit(), throttle.getReqCapacityUnit()),
|
||||
Pair.newPair(throttle.hasReadCapacityUnit(), throttle.getReadCapacityUnit()),
|
||||
Pair.newPair(throttle.hasWriteCapacityUnit(), throttle.getWriteCapacityUnit()));
|
||||
for (Pair<Boolean, TimedQuota> pair : list) {
|
||||
if (pair.getFirst()) {
|
||||
if (pair.getSecond().getTimeUnit() != TimeUnit.SECONDS) {
|
||||
throw new DoNotRetryIOException("All region server quota must be "
|
||||
+ "in seconds time unit if enable exceed throttle quota");
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// If enable exceed throttle quota, make sure that region server quota is already set
|
||||
throw new DoNotRetryIOException(
|
||||
"Please set region server quota before enable exceed throttle quota");
|
||||
}
|
||||
}
|
||||
|
||||
protected static boolean isExceedThrottleQuotaEnabled(final Connection connection)
|
||||
throws IOException {
|
||||
Get get = new Get(getExceedThrottleQuotaRowKey());
|
||||
get.addColumn(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
|
||||
Result result = doGet(connection, get);
|
||||
if (result.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
return Bytes.toBoolean(result.getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS));
|
||||
}
|
||||
|
||||
private static void addQuotas(final Connection connection, final byte[] rowKey,
|
||||
final Quotas data) throws IOException {
|
||||
addQuotas(connection, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
|
||||
|
|
|
@ -138,14 +138,20 @@ public class RegionServerRpcQuotaManager {
|
|||
QuotaLimiter rsLimiter = quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY);
|
||||
useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass() && rsLimiter.isBypass();
|
||||
boolean exceedThrottleQuotaEnabled = quotaCache.isExceedThrottleQuotaEnabled();
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter
|
||||
+ " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter + " rsLimiter="
|
||||
+ rsLimiter);
|
||||
+ rsLimiter + " exceedThrottleQuotaEnabled=" + exceedThrottleQuotaEnabled);
|
||||
}
|
||||
if (!useNoop) {
|
||||
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
|
||||
tableLimiter, nsLimiter, rsLimiter);
|
||||
if (exceedThrottleQuotaEnabled) {
|
||||
return new ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
|
||||
userLimiter, tableLimiter, nsLimiter);
|
||||
} else {
|
||||
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
|
||||
tableLimiter, nsLimiter, rsLimiter);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2569,6 +2569,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
|
|||
requirePermission(ctx, "isRpcThrottleEnabled", Action.ADMIN);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preSwitchExceedThrottleQuota(ObserverContext<MasterCoprocessorEnvironment> ctx,
|
||||
boolean enable) throws IOException {
|
||||
requirePermission(ctx, "switchExceedThrottleQuota", Action.ADMIN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the active user to which authorization checks should be applied.
|
||||
* If we are in the context of an RPC call, the remote user is used,
|
||||
|
|
|
@ -187,6 +187,12 @@ public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase {
|
|||
assertEquals(true, future2.get().booleanValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchExceedThrottleQuota() throws Exception {
|
||||
AsyncAdmin admin = ASYNC_CONN.getAdmin();
|
||||
assertEquals(false, admin.exceedThrottleQuotaSwitch(false).get().booleanValue());
|
||||
}
|
||||
|
||||
private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
|
||||
assertEquals(expected, countResults(filter));
|
||||
}
|
||||
|
|
|
@ -597,6 +597,48 @@ public class TestQuotaAdmin {
|
|||
testSwitchRpcThrottle(admin, false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchExceedThrottleQuota() throws IOException {
|
||||
String regionServer = QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY;
|
||||
Admin admin = TEST_UTIL.getAdmin();
|
||||
|
||||
try {
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
fail("should not come here, because can't enable exceed throttle quota "
|
||||
+ "if there is no region server quota");
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Expected exception", e);
|
||||
}
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer,
|
||||
ThrottleType.WRITE_NUMBER, 100, TimeUnit.SECONDS));
|
||||
try {
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
fail("should not come here, because can't enable exceed throttle quota "
|
||||
+ "if there is no read region server quota");
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Expected exception", e);
|
||||
}
|
||||
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer, ThrottleType.READ_NUMBER,
|
||||
20, TimeUnit.MINUTES));
|
||||
try {
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
fail("should not come here, because can't enable exceed throttle quota "
|
||||
+ "because not all region server quota are in seconds time unit");
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Expected exception", e);
|
||||
}
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(regionServer, ThrottleType.READ_NUMBER,
|
||||
20, TimeUnit.SECONDS));
|
||||
|
||||
assertFalse(admin.exceedThrottleQuotaSwitch(true));
|
||||
assertTrue(admin.exceedThrottleQuotaSwitch(true));
|
||||
assertTrue(admin.exceedThrottleQuotaSwitch(false));
|
||||
assertFalse(admin.exceedThrottleQuotaSwitch(false));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleRegionServer(regionServer));
|
||||
}
|
||||
|
||||
private void testSwitchRpcThrottle(Admin admin, boolean oldRpcThrottle, boolean newRpcThrottle)
|
||||
throws IOException {
|
||||
boolean state = admin.switchRpcThrottle(newRpcThrottle);
|
||||
|
|
|
@ -579,20 +579,81 @@ public class TestQuotaThrottle {
|
|||
// requests are throttled by table quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 7, TimeUnit.MINUTES));
|
||||
triggerCacheRefresh(false, false, true, false, true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
|
||||
// requests are throttled by region server quota
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 4, TimeUnit.MINUTES));
|
||||
triggerCacheRefresh(false, false, false, false, true, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
assertEquals(4, doPuts(10, tables[0]));
|
||||
|
||||
// unthrottle
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.unthrottleRegionServer(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerCacheRefresh(true, false, true, false, true, TABLE_NAMES[0]);
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
triggerRegionServerCacheRefresh(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceedThrottleQuota() throws Exception {
|
||||
final Admin admin = TEST_UTIL.getAdmin();
|
||||
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], ThrottleType.WRITE_NUMBER, 5,
|
||||
TimeUnit.MINUTES));
|
||||
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 20, TimeUnit.SECONDS));
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.READ_NUMBER, 10, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
|
||||
// enable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
// exceed table limit and allowed by region server limit
|
||||
triggerExceedThrottleQuotaCacheRefresh(true);
|
||||
waitMinuteQuota();
|
||||
assertEquals(10, doPuts(10, tables[0]));
|
||||
// exceed table limit and throttled by region server limit
|
||||
waitMinuteQuota();
|
||||
assertEquals(20, doPuts(25, tables[0]));
|
||||
|
||||
// set region server limiter is lower than table limiter
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 2, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
// throttled by region server limiter
|
||||
waitMinuteQuota();
|
||||
assertEquals(2, doPuts(10, tables[0]));
|
||||
admin.setQuota(QuotaSettingsFactory.throttleRegionServer(
|
||||
QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY, ThrottleType.WRITE_NUMBER, 20, TimeUnit.SECONDS));
|
||||
triggerRegionServerCacheRefresh(false);
|
||||
|
||||
// disable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(false);
|
||||
waitMinuteQuota();
|
||||
// throttled by table limit
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
|
||||
// enable exceed throttle quota and unthrottle region server
|
||||
admin.exceedThrottleQuotaSwitch(true);
|
||||
triggerExceedThrottleQuotaCacheRefresh(true);
|
||||
waitMinuteQuota();
|
||||
admin.setQuota(
|
||||
QuotaSettingsFactory.unthrottleRegionServer(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY));
|
||||
triggerRegionServerCacheRefresh(true);
|
||||
waitMinuteQuota();
|
||||
// throttled by table limit
|
||||
assertEquals(5, doPuts(10, tables[0]));
|
||||
|
||||
// disable exceed throttle quota
|
||||
admin.exceedThrottleQuotaSwitch(false);
|
||||
triggerExceedThrottleQuotaCacheRefresh(false);
|
||||
// unthrottle table
|
||||
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
|
||||
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
|
||||
}
|
||||
|
||||
private int doPuts(int maxOps, final Table... tables) throws Exception {
|
||||
|
@ -647,29 +708,39 @@ public class TestQuotaThrottle {
|
|||
}
|
||||
|
||||
private void triggerUserCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, true, false, false, false, tables);
|
||||
triggerCacheRefresh(bypass, true, false, false, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerTableCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, true, false, false, tables);
|
||||
triggerCacheRefresh(bypass, false, true, false, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerNamespaceCacheRefresh(boolean bypass, TableName... tables) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, false, true, false, tables);
|
||||
triggerCacheRefresh(bypass, false, false, true, false, false, tables);
|
||||
}
|
||||
|
||||
private void triggerRegionServerCacheRefresh(boolean bypass) throws Exception {
|
||||
triggerCacheRefresh(bypass, false, false, false, true, false);
|
||||
}
|
||||
|
||||
private void triggerExceedThrottleQuotaCacheRefresh(boolean exceedEnabled) throws Exception {
|
||||
triggerCacheRefresh(exceedEnabled, false, false, false, false, true);
|
||||
}
|
||||
|
||||
private void triggerCacheRefresh(boolean bypass, boolean userLimiter, boolean tableLimiter,
|
||||
boolean nsLimiter, boolean rsLimiter, final TableName... tables) throws Exception {
|
||||
boolean nsLimiter, boolean rsLimiter, boolean exceedThrottleQuota, final TableName... tables)
|
||||
throws Exception {
|
||||
envEdge.incValue(2 * REFRESH_TIME);
|
||||
for (RegionServerThread rst: TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
for (RegionServerThread rst : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
|
||||
RegionServerRpcQuotaManager quotaManager =
|
||||
rst.getRegionServer().getRegionServerRpcQuotaManager();
|
||||
QuotaCache quotaCache = quotaManager.getQuotaCache();
|
||||
|
||||
quotaCache.triggerCacheRefresh();
|
||||
// sleep for cache update
|
||||
Thread.sleep(250);
|
||||
|
||||
for (TableName table: tables) {
|
||||
for (TableName table : tables) {
|
||||
quotaCache.getTableLimiter(table);
|
||||
}
|
||||
|
||||
|
@ -677,7 +748,7 @@ public class TestQuotaThrottle {
|
|||
while (!isUpdated) {
|
||||
quotaCache.triggerCacheRefresh();
|
||||
isUpdated = true;
|
||||
for (TableName table: tables) {
|
||||
for (TableName table : tables) {
|
||||
boolean isBypass = true;
|
||||
if (userLimiter) {
|
||||
isBypass &= quotaCache.getUserLimiter(User.getCurrent().getUGI(), table).isBypass();
|
||||
|
@ -688,17 +759,27 @@ public class TestQuotaThrottle {
|
|||
if (nsLimiter) {
|
||||
isBypass &= quotaCache.getNamespaceLimiter(table.getNamespaceAsString()).isBypass();
|
||||
}
|
||||
if (rsLimiter) {
|
||||
isBypass &= quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY)
|
||||
.isBypass();
|
||||
}
|
||||
if (isBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (rsLimiter) {
|
||||
boolean rsIsBypass = quotaCache
|
||||
.getRegionServerQuotaLimiter(QuotaTableUtil.QUOTA_REGION_SERVER_ROW_KEY).isBypass();
|
||||
if (rsIsBypass != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (exceedThrottleQuota) {
|
||||
if (quotaCache.isExceedThrottleQuotaEnabled() != bypass) {
|
||||
envEdge.incValue(100);
|
||||
isUpdated = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("QuotaCache");
|
||||
|
|
|
@ -3499,6 +3499,20 @@ public class TestAccessController extends SecureTestUtil {
|
|||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSwitchExceedThrottleQuota() throws Exception {
|
||||
AccessTestAction action = new AccessTestAction() {
|
||||
@Override
|
||||
public Object run() throws Exception {
|
||||
ACCESS_CONTROLLER.preSwitchExceedThrottleQuota(ObserverContextImpl.createAndPrepare(CP_ENV),
|
||||
true);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
verifyAllowed(action, SUPERUSER, USER_ADMIN);
|
||||
verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER);
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate Global User ACL
|
||||
*/
|
||||
|
|
|
@ -268,6 +268,10 @@ module Hbase
|
|||
@admin.switchRpcThrottle(java.lang.Boolean.valueOf(enabled))
|
||||
end
|
||||
|
||||
def switch_exceed_throttle_quota(enabled)
|
||||
@admin.exceedThrottleQuotaSwitch(java.lang.Boolean.valueOf(enabled))
|
||||
end
|
||||
|
||||
def _parse_size(str_limit)
|
||||
str_limit = str_limit.downcase
|
||||
match = /^(\d+)([bkmgtp%]?)$/.match(str_limit)
|
||||
|
|
|
@ -441,6 +441,8 @@ Shell.load_command_group(
|
|||
list_snapshot_sizes
|
||||
enable_rpc_throttle
|
||||
disable_rpc_throttle
|
||||
enable_exceed_throttle_quota
|
||||
disable_exceed_throttle_quota
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class DisableExceedThrottleQuota < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Disable exceed throttle quota. Returns previous exceed throttle quota enabled value.
|
||||
NOTE: if quota is not enabled, this will not work and always return false.
|
||||
|
||||
Examples:
|
||||
hbase> disable_exceed_throttle_quota
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
prev_state = quotas_admin.switch_exceed_throttle_quota(false) ? 'true' : 'false'
|
||||
formatter.row(["Previous exceed throttle quota enabled : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,50 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class EnableExceedThrottleQuota < Command
|
||||
def help
|
||||
<<-EOF
|
||||
Enable exceed throttle quota. Returns previous exceed throttle quota enabled value.
|
||||
NOTE: if quota is not enabled, this will not work and always return false.
|
||||
|
||||
If enabled, allow requests exceed user/table/namespace throttle quotas when region
|
||||
server has available quota.
|
||||
|
||||
There are two limits if enable exceed throttle quota. First, please set region server
|
||||
quota. Second, please make sure that all region server throttle quotas are in seconds
|
||||
time unit, because once previous requests exceed their quota and consume region server
|
||||
quota, quota in other time units may be refilled in a long time, which may affect later
|
||||
requests.
|
||||
|
||||
|
||||
Examples:
|
||||
hbase> enable_exceed_throttle_quota
|
||||
EOF
|
||||
end
|
||||
|
||||
def command
|
||||
prev_state = quotas_admin.switch_exceed_throttle_quota(true) ? 'true' : 'false'
|
||||
formatter.row(["Previous exceed throttle quota enabled : #{prev_state}"])
|
||||
prev_state
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -272,6 +272,16 @@ module Hbase
|
|||
output = capture_stdout{ command(:list_quotas) }
|
||||
assert(output.include?('0 row(s)'))
|
||||
end
|
||||
|
||||
define_test 'switch exceed throttle quota' do
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => '1CU/sec')
|
||||
output = capture_stdout { command(:enable_exceed_throttle_quota) }
|
||||
assert(output.include?('Previous exceed throttle quota enabled : false'))
|
||||
|
||||
output = capture_stdout { command(:disable_exceed_throttle_quota) }
|
||||
assert(output.include?('Previous exceed throttle quota enabled : true'))
|
||||
command(:set_quota, TYPE => THROTTLE, REGIONSERVER => 'all', LIMIT => NONE)
|
||||
end
|
||||
end
|
||||
# rubocop:enable Metrics/ClassLength
|
||||
end
|
||||
|
|
|
@ -511,6 +511,12 @@ public class ThriftAdmin implements Admin {
|
|||
"isRpcThrottleEnabled by pattern not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exceedThrottleQuotaSwitch(boolean enable) throws IOException {
|
||||
throw new NotImplementedException(
|
||||
"exceedThrottleQuotaSwitch by pattern not supported in ThriftAdmin");
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableDescriptor[] disableTables(String regex) throws IOException {
|
||||
throw new NotImplementedException("disableTables by pattern not supported in ThriftAdmin");
|
||||
|
|
Loading…
Reference in New Issue