Revert "HBASE-21034 Add new throttle type: read/write capacity unit"

This reverts commit 483b7d008e.
This commit is contained in:
Guanghao Zhang 2019-01-19 09:49:04 +08:00
parent b06387b2b0
commit 450124fe56
19 changed files with 67 additions and 409 deletions

View File

@ -143,18 +143,6 @@ public class QuotaSettingsFactory {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.READ_SIZE, throttle.getReadSize()));
}
if (throttle.hasReqCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
}
if (throttle.hasReadCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
}
if (throttle.hasWriteCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
}
return settings;
}

View File

@ -29,15 +29,13 @@ public class RpcThrottlingException extends HBaseIOException {
@InterfaceAudience.Public
public enum Type {
NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
WriteSizeExceeded, ReadSizeExceeded, RequestCapacityUnitExceeded, ReadCapacityUnitExceeded,
WriteCapacityUnitExceeded
WriteSizeExceeded, ReadSizeExceeded,
}
private static final String[] MSG_TYPE =
new String[] { "number of requests exceeded", "request size limit exceeded",
"number of read requests exceeded", "number of write requests exceeded",
"write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded",
"read capacity unit exceeded", "write capacity unit exceeded" };
"write size limit exceeded", "read size limit exceeded", };
private static final String MSG_WAIT = " - wait ";
@ -102,21 +100,6 @@ public class RpcThrottlingException extends HBaseIOException {
throwThrottlingException(Type.ReadSizeExceeded, waitInterval);
}
public static void throwRequestCapacityUnitExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.RequestCapacityUnitExceeded, waitInterval);
}
public static void throwReadCapacityUnitExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.ReadCapacityUnitExceeded, waitInterval);
}
public static void throwWriteCapacityUnitExceeded(final long waitInterval)
throws RpcThrottlingException {
throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval);
}
private static void throwThrottlingException(final Type type, final long waitInterval)
throws RpcThrottlingException {
String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + StringUtils.formatTime(waitInterval);

View File

@ -95,12 +95,6 @@ class ThrottleSettings extends QuotaSettings {
case READ_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit()));
break;
case REQUEST_CAPACITY_UNIT:
case READ_CAPACITY_UNIT:
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
break;
default:
}
} else if (timedQuota.hasShare()) {
builder.append(String.format("%.2f%%", timedQuota.getShare()));

View File

@ -41,13 +41,4 @@ public enum ThrottleType {
/** Throttling based on the read data size */
READ_SIZE,
/** Throttling based on the read+write capacity unit */
REQUEST_CAPACITY_UNIT,
/** Throttling based on the write data capacity unit */
WRITE_CAPACITY_UNIT,
/** Throttling based on the read data capacity unit */
READ_CAPACITY_UNIT,
}

View File

@ -2387,27 +2387,14 @@ public final class ProtobufUtil {
*/
public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) {
switch (proto) {
case REQUEST_NUMBER:
return ThrottleType.REQUEST_NUMBER;
case REQUEST_SIZE:
return ThrottleType.REQUEST_SIZE;
case REQUEST_CAPACITY_UNIT:
return ThrottleType.REQUEST_CAPACITY_UNIT;
case WRITE_NUMBER:
return ThrottleType.WRITE_NUMBER;
case WRITE_SIZE:
return ThrottleType.WRITE_SIZE;
case READ_NUMBER:
return ThrottleType.READ_NUMBER;
case READ_SIZE:
return ThrottleType.READ_SIZE;
case READ_CAPACITY_UNIT:
return ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return ThrottleType.WRITE_CAPACITY_UNIT;
default:
throw new RuntimeException("Invalid ThrottleType " + proto);
case REQUEST_NUMBER: return ThrottleType.REQUEST_NUMBER;
case REQUEST_SIZE: return ThrottleType.REQUEST_SIZE;
case WRITE_NUMBER: return ThrottleType.WRITE_NUMBER;
case WRITE_SIZE: return ThrottleType.WRITE_SIZE;
case READ_NUMBER: return ThrottleType.READ_NUMBER;
case READ_SIZE: return ThrottleType.READ_SIZE;
}
throw new RuntimeException("Invalid ThrottleType " + proto);
}
/**
@ -2418,27 +2405,14 @@ public final class ProtobufUtil {
*/
public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) {
switch (type) {
case REQUEST_NUMBER:
return QuotaProtos.ThrottleType.REQUEST_NUMBER;
case REQUEST_SIZE:
return QuotaProtos.ThrottleType.REQUEST_SIZE;
case WRITE_NUMBER:
return QuotaProtos.ThrottleType.WRITE_NUMBER;
case WRITE_SIZE:
return QuotaProtos.ThrottleType.WRITE_SIZE;
case READ_NUMBER:
return QuotaProtos.ThrottleType.READ_NUMBER;
case READ_SIZE:
return QuotaProtos.ThrottleType.READ_SIZE;
case REQUEST_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT;
case READ_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
default:
throw new RuntimeException("Invalid ThrottleType " + type);
case REQUEST_NUMBER: return QuotaProtos.ThrottleType.REQUEST_NUMBER;
case REQUEST_SIZE: return QuotaProtos.ThrottleType.REQUEST_SIZE;
case WRITE_NUMBER: return QuotaProtos.ThrottleType.WRITE_NUMBER;
case WRITE_SIZE: return QuotaProtos.ThrottleType.WRITE_SIZE;
case READ_NUMBER: return QuotaProtos.ThrottleType.READ_NUMBER;
case READ_SIZE: return QuotaProtos.ThrottleType.READ_SIZE;
}
throw new RuntimeException("Invalid ThrottleType " + type);
}
/**

View File

@ -46,9 +46,6 @@ enum ThrottleType {
WRITE_SIZE = 4;
READ_NUMBER = 5;
READ_SIZE = 6;
REQUEST_CAPACITY_UNIT = 7;
WRITE_CAPACITY_UNIT = 8;
READ_CAPACITY_UNIT = 9;
}
message Throttle {
@ -60,10 +57,6 @@ message Throttle {
optional TimedQuota read_num = 5;
optional TimedQuota read_size = 6;
optional TimedQuota req_capacity_unit = 7;
optional TimedQuota write_capacity_unit = 8;
optional TimedQuota read_capacity_unit = 9;
}
message ThrottleRequest {

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.quotas;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@ -35,29 +34,20 @@ public class DefaultOperationQuota implements OperationQuota {
private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
private final List<QuotaLimiter> limiters;
private final long writeCapacityUnit;
private final long readCapacityUnit;
private long writeAvailable = 0;
private long readAvailable = 0;
private long writeConsumed = 0;
private long readConsumed = 0;
private long writeCapacityUnitConsumed = 0;
private long readCapacityUnitConsumed = 0;
private final long[] operationSize;
public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
this(conf, Arrays.asList(limiters));
public DefaultOperationQuota(final QuotaLimiter... limiters) {
this(Arrays.asList(limiters));
}
/**
* NOTE: The order matters. It should be something like [user, table, namespace, global]
*/
public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
this.writeCapacityUnit =
conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
this.readCapacityUnit =
conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
this.limiters = limiters;
int size = OperationType.values().length;
operationSize = new long[size];
@ -68,28 +58,24 @@ public class DefaultOperationQuota implements OperationQuota {
}
@Override
public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
public void checkQuota(int numWrites, int numReads, int numScans)
throws RpcThrottlingException {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
writeAvailable = Long.MAX_VALUE;
readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) {
for (final QuotaLimiter limiter: limiters) {
if (limiter.isBypass()) continue;
limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
}
for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
for (final QuotaLimiter limiter: limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
}
}
@ -97,21 +83,12 @@ public class DefaultOperationQuota implements OperationQuota {
public void close() {
// Adjust the quota consumed for the specified operation
long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
long readDiff = operationSize[OperationType.GET.ordinal()]
+ operationSize[OperationType.SCAN.ordinal()] - readConsumed;
long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
readConsumed);
long readDiff = operationSize[OperationType.GET.ordinal()] +
operationSize[OperationType.SCAN.ordinal()] - readConsumed;
for (final QuotaLimiter limiter : limiters) {
if (writeDiff != 0) {
limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
}
if (readDiff != 0) {
limiter.consumeRead(readDiff, readCapacityUnitDiff);
}
for (final QuotaLimiter limiter: limiters) {
if (writeDiff != 0) limiter.consumeWrite(writeDiff);
if (readDiff != 0) limiter.consumeRead(readDiff);
}
}
@ -146,20 +123,4 @@ public class DefaultOperationQuota implements OperationQuota {
}
return 0;
}
private long calculateWriteCapacityUnit(final long size) {
return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
}
private long calculateReadCapacityUnit(final long size) {
return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
}
private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
}
private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
}
}

View File

@ -149,16 +149,6 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
case READ_SIZE:
throttleBuilder.setReadSize(otherProto.getTimedQuota());
break;
case REQUEST_CAPACITY_UNIT:
throttleBuilder.setReqCapacityUnit(otherProto.getTimedQuota());
break;
case READ_CAPACITY_UNIT:
throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota());
break;
case WRITE_CAPACITY_UNIT:
throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota());
break;
default:
}
}
}
@ -242,11 +232,6 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
case READ_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit()));
break;
case REQUEST_CAPACITY_UNIT:
case READ_CAPACITY_UNIT:
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
default:
}
} else if (timedQuota.hasShare()) {
builder.append(String.format("%.2f%%", timedQuota.getShare()));
@ -304,15 +289,6 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
if (proto.hasWriteSize()) {
quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize());
}
if (proto.hasReqCapacityUnit()) {
quotas.put(ThrottleType.REQUEST_CAPACITY_UNIT, proto.getReqCapacityUnit());
}
if (proto.hasReadCapacityUnit()) {
quotas.put(ThrottleType.READ_CAPACITY_UNIT, proto.getReqCapacityUnit());
}
if (proto.hasWriteCapacityUnit()) {
quotas.put(ThrottleType.WRITE_CAPACITY_UNIT, proto.getWriteCapacityUnit());
}
return quotas;
}
@ -323,8 +299,5 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
builder.clearReqSize();
builder.clearWriteNum();
builder.clearWriteSize();
builder.clearReadCapacityUnit();
builder.clearReadCapacityUnit();
builder.clearWriteCapacityUnit();
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/**
* Noop quota limiter returned when no limiter is associated to the user/table
@ -35,24 +36,22 @@ class NoopQuotaLimiter implements QuotaLimiter {
@Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
long estimateReadSize) throws RpcThrottlingException {
// no-op
}
@Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit) {
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
// no-op
}
@Override
public void consumeWrite(final long size, long capacityUnit) {
public void consumeWrite(final long size) {
// no-op
}
@Override
public void consumeRead(final long size, long capacityUnit) {
public void consumeRead(final long size) {
// no-op
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.quotas;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/**
* Internal interface used to interact with the user/table quota.
@ -34,14 +35,10 @@ public interface QuotaLimiter {
* @param estimateWriteSize the write size that will be checked against the available quota
* @param readReqs the read requests that will be checked against the available quota
* @param estimateReadSize the read size that will be checked against the available quota
* @param estimateWriteCapacityUnit the write capacity unit that will be checked against the
* available quota
* @param estimateReadCapacityUnit the read capacity unit that will be checked against the
* available quota
* @throws RpcThrottlingException thrown if not enough available resources to perform operation.
*/
void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize,
long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException;
void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
throws RpcThrottlingException;
/**
* Removes the specified write and read amount from the quota.
@ -52,23 +49,20 @@ public interface QuotaLimiter {
* @param writeSize the write size that will be removed from the current quota
* @param readReqs the read requests that will be removed from the current quota
* @param readSize the read size that will be removed from the current quota
* @param writeCapacityUnit the write capacity unit that will be removed from the current quota
* @param readCapacityUnit the read capacity unit num that will be removed from the current quota
*/
void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit);
void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize);
/**
* Removes or add back some write amount to the quota.
* (called at the end of an operation in case the estimate quota was off)
*/
void consumeWrite(long size, long capacityUnit);
void consumeWrite(long size);
/**
* Removes or add back some read amount to the quota.
* (called at the end of an operation in case the estimate quota was off)
*/
void consumeRead(long size, long capacityUnit);
void consumeRead(long size);
/** @return true if the limiter is a noop */
boolean isBypass();

View File

@ -57,13 +57,6 @@ public class QuotaUtil extends QuotaTableUtil {
public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
private static final boolean QUOTA_ENABLED_DEFAULT = false;
public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
// the default one read capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
// the default one write capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
/** Table descriptor for Quota internal table */
public static final HTableDescriptor QUOTA_TABLE_DESC =
new HTableDescriptor(QUOTA_TABLE_NAME);

View File

@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager {
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
return new DefaultOperationQuota(userLimiter);
}
} else {
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@ -113,8 +113,7 @@ public class RegionServerRpcQuotaManager {
userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
}
if (!useNoop) {
return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
tableLimiter, nsLimiter);
return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
}
}
}

View File

@ -40,9 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter writeSizeLimiter = null;
private RateLimiter readReqsLimiter = null;
private RateLimiter readSizeLimiter = null;
private RateLimiter reqCapacityUnitLimiter = null;
private RateLimiter writeCapacityUnitLimiter = null;
private RateLimiter readCapacityUnitLimiter = null;
private TimeBasedLimiter() {
if (FixedIntervalRateLimiter.class.getName().equals(
@ -54,9 +51,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter = new FixedIntervalRateLimiter();
readReqsLimiter = new FixedIntervalRateLimiter();
readSizeLimiter = new FixedIntervalRateLimiter();
reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
readCapacityUnitLimiter = new FixedIntervalRateLimiter();
} else {
reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter();
@ -64,9 +58,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter = new AverageIntervalRateLimiter();
readReqsLimiter = new AverageIntervalRateLimiter();
readSizeLimiter = new AverageIntervalRateLimiter();
reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
readCapacityUnitLimiter = new AverageIntervalRateLimiter();
}
}
@ -102,21 +93,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
isBypass = false;
}
if (throttle.hasReqCapacityUnit()) {
setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
isBypass = false;
}
if (throttle.hasWriteCapacityUnit()) {
setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
isBypass = false;
}
if (throttle.hasReadCapacityUnit()) {
setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
isBypass = false;
}
return isBypass ? NoopQuotaLimiter.get() : limiter;
}
@ -127,9 +103,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter.update(other.writeSizeLimiter);
readReqsLimiter.update(other.readReqsLimiter);
readSizeLimiter.update(other.readSizeLimiter);
reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
}
private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
@ -138,8 +111,7 @@ public class TimeBasedLimiter implements QuotaLimiter {
@Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
long estimateReadSize) throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
}
@ -147,10 +119,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwRequestSizeExceeded(
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
}
if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
RpcThrottlingException.throwRequestCapacityUnitExceeded(
reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
}
if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute(writeReqs)) {
@ -160,10 +128,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwWriteSizeExceeded(
writeSizeLimiter.waitInterval(estimateWriteSize));
}
if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
RpcThrottlingException.throwWriteCapacityUnitExceeded(
writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
}
}
if (estimateReadSize > 0) {
@ -174,16 +138,11 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwReadSizeExceeded(
readSizeLimiter.waitInterval(estimateReadSize));
}
if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
RpcThrottlingException.throwReadCapacityUnitExceeded(
readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
}
}
}
@Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit) {
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
assert writeSize != 0 || readSize != 0;
reqsLimiter.consume(writeReqs + readReqs);
@ -197,30 +156,18 @@ public class TimeBasedLimiter implements QuotaLimiter {
readReqsLimiter.consume(readReqs);
readSizeLimiter.consume(readSize);
}
if (writeCapacityUnit > 0) {
reqCapacityUnitLimiter.consume(writeCapacityUnit);
writeCapacityUnitLimiter.consume(writeCapacityUnit);
}
if (readCapacityUnit > 0) {
reqCapacityUnitLimiter.consume(readCapacityUnit);
readCapacityUnitLimiter.consume(readCapacityUnit);
}
}
@Override
public void consumeWrite(final long size, long capacityUnit) {
public void consumeWrite(final long size) {
reqSizeLimiter.consume(size);
writeSizeLimiter.consume(size);
reqCapacityUnitLimiter.consume(capacityUnit);
writeCapacityUnitLimiter.consume(capacityUnit);
}
@Override
public void consumeRead(final long size, long capacityUnit) {
public void consumeRead(final long size) {
reqSizeLimiter.consume(size);
readSizeLimiter.consume(size);
reqCapacityUnitLimiter.consume(capacityUnit);
readCapacityUnitLimiter.consume(capacityUnit);
}
@Override
@ -242,33 +189,12 @@ public class TimeBasedLimiter implements QuotaLimiter {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("TimeBasedLimiter(");
if (!reqsLimiter.isBypass()) {
builder.append("reqs=" + reqsLimiter);
}
if (!reqSizeLimiter.isBypass()) {
builder.append(" resSize=" + reqSizeLimiter);
}
if (!writeReqsLimiter.isBypass()) {
builder.append(" writeReqs=" + writeReqsLimiter);
}
if (!writeSizeLimiter.isBypass()) {
builder.append(" writeSize=" + writeSizeLimiter);
}
if (!readReqsLimiter.isBypass()) {
builder.append(" readReqs=" + readReqsLimiter);
}
if (!readSizeLimiter.isBypass()) {
builder.append(" readSize=" + readSizeLimiter);
}
if (!reqCapacityUnitLimiter.isBypass()) {
builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
}
if (!writeCapacityUnitLimiter.isBypass()) {
builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
}
if (!readCapacityUnitLimiter.isBypass()) {
builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
}
if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
builder.append(')');
return builder.toString();
}

View File

@ -455,22 +455,17 @@ public class TestQuotaAdmin {
@Test
public void testSetGetRemoveRPCQuota() throws Exception {
testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE);
testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT);
}
private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception {
Admin admin = TEST_UTIL.getAdmin();
final TableName tn = TableName.valueOf("sq_table1");
QuotaSettings settings =
QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS);
QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
admin.setQuota(settings);
// Verify the Quota in the table
verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS);
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
// Verify we can retrieve it via the QuotaRetriever API
verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS);
verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS);
// Now, remove the quota
QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
@ -589,19 +584,6 @@ public class TestQuotaAdmin {
assertTrue(rpcQuota.hasWriteSize());
t = rpcQuota.getWriteSize();
break;
case REQUEST_CAPACITY_UNIT:
assertTrue(rpcQuota.hasReqCapacityUnit());
t = rpcQuota.getReqCapacityUnit();
break;
case READ_CAPACITY_UNIT:
assertTrue(rpcQuota.hasReadCapacityUnit());
t = rpcQuota.getReadCapacityUnit();
break;
case WRITE_CAPACITY_UNIT:
assertTrue(rpcQuota.hasWriteCapacityUnit());
t = rpcQuota.getWriteCapacityUnit();
break;
default:
}
assertEquals(t.getSoftLimit(), limit);

View File

@ -224,7 +224,7 @@ public class TestQuotaState {
assertFalse(quotaInfo.isBypass());
QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
try {
limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0);
limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) {
// expected
@ -242,7 +242,7 @@ public class TestQuotaState {
private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
assertNoThrottleException(limiter, availReqs);
try {
limiter.checkQuota(1, 1, 0, 0, 1, 0);
limiter.checkQuota(1, 1, 0, 0);
fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) {
// expected
@ -252,11 +252,11 @@ public class TestQuotaState {
private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
for (int i = 0; i < availReqs; ++i) {
try {
limiter.checkQuota(1, 1, 0, 0, 1, 0);
limiter.checkQuota(1, 1, 0, 0);
} catch (RpcThrottlingException e) {
fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
}
limiter.grabQuota(1, 1, 0, 0, 1, 0);
limiter.grabQuota(1, 1, 0, 0);
}
}

View File

@ -521,68 +521,18 @@ public class TestQuotaThrottle {
Table table = TEST_UTIL.getConnection().getTable(TABLE_NAMES[0]);
// An exists call when having throttle quota
table.exists(new Get(Bytes.toBytes("abc")));
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
}
@Test
public void testTableWriteCapacityUnitThrottle() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
// Add 6CU/min limit
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
// should execute at max 6 capacity units because each put size is 1 capacity unit
assertEquals(6, doPuts(20, 10, tables[0]));
// wait a minute and you should execute at max 3 capacity units because each put size is 2
// capacity unit
waitMinuteQuota();
assertEquals(3, doPuts(20, 1025, tables[0]));
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
}
@Test
public void testTableReadCapacityUnitThrottle() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
// Add 6CU/min limit
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
assertEquals(20, doPuts(20, 10, tables[0]));
// should execute at max 6 capacity units because each get size is 1 capacity unit
assertEquals(6, doGets(20, tables[0]));
assertEquals(20, doPuts(20, 2015, tables[0]));
// wait a minute and you should execute at max 3 capacity units because each get size is 2
// capacity unit on tables[0]
waitMinuteQuota();
assertEquals(3, doGets(20, tables[0]));
}
private int doPuts(int maxOps, final Table... tables) throws Exception {
return doPuts(maxOps, -1, tables);
}
private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception {
int count = 0;
try {
while (count < maxOps) {
Put put = new Put(Bytes.toBytes("row-" + count));
byte[] value;
if (valueSize < 0) {
value = Bytes.toBytes("data-" + count);
} else {
value = generateValue(valueSize);
}
put.addColumn(FAMILY, QUALIFIER, value);
for (final Table table : tables) {
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count));
for (final Table table: tables) {
table.put(put);
}
count += tables.length;
@ -593,14 +543,6 @@ public class TestQuotaThrottle {
return count;
}
private byte[] generateValue(int valueSize) {
byte[] bytes = new byte[valueSize];
for (int i = 0; i < valueSize; i++) {
bytes[i] = 'a';
}
return bytes;
}
private long doGets(int maxOps, final Table... tables) throws Exception {
int count = 0;
try {

View File

@ -263,14 +263,11 @@ module Hbase
def _parse_limit(str_limit, type_cls, type)
str_limit = str_limit.downcase
match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
if match
if match[2] == 'req'
limit = match[1].to_i
type = type_cls.valueOf(type + '_NUMBER')
elsif match[2] == 'cu'
limit = match[1].to_i
type = type_cls.valueOf(type + '_CAPACITY_UNIT')
else
limit = _size_from_str(match[1].to_i, match[2])
type = type_cls.valueOf(type + '_SIZE')

View File

@ -26,12 +26,11 @@ Set a quota for a user, table, or namespace.
Syntax : set_quota TYPE => <type>, <args>
TYPE => THROTTLE
User can either set quota on read, write or on both the requests together(i.e., read+write).
User can either set quota on read, write or on both the requests together(i.e., read+write)
The read, write, or read+write(default throttle type) request limit can be expressed using
the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit
can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
; the read, write, read+write(default throttle type) limit can be expressed using the form
100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
and (sec, min, hour, day) as valid time unit.
Currently the throttle limit is per machine - a limit of 100req/min
means that each machine can execute 100req/min.
@ -43,9 +42,6 @@ For example:
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE

View File

@ -135,32 +135,5 @@ module Hbase
assert(output.include? snapshot1)
assert(output.include? snapshot2)
end
define_test 'can set and remove user CU quota' do
command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => '1CU/sec')
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('USER => user1'))
assert(output.include?('TYPE => THROTTLE'))
assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
assert(output.include?('LIMIT => 1CU/sec'))
command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => NONE)
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('0 row(s)'))
end
define_test 'can set and remove table CU quota' do
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name,
THROTTLE_TYPE => WRITE, LIMIT => '2CU/min')
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('TABLE => hbase_shell_quota_tests_table'))
assert(output.include?('TYPE => THROTTLE'))
assert(output.include?('THROTTLE_TYPE => WRITE_CAPACITY_UNIT'))
assert(output.include?('LIMIT => 2CU/min'))
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE)
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('0 row(s)'))
end
end
end