HBASE-21034 Add new throttle type: read/write capacity unit

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
meiyi 2018-11-19 17:17:30 +08:00 committed by Guanghao Zhang
parent 6a64811f44
commit d590d6e472
18 changed files with 394 additions and 65 deletions

View File

@ -143,6 +143,18 @@ public class QuotaSettingsFactory {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.READ_SIZE, throttle.getReadSize())); ThrottleType.READ_SIZE, throttle.getReadSize()));
} }
if (throttle.hasReqCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit()));
}
if (throttle.hasReadCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit()));
}
if (throttle.hasWriteCapacityUnit()) {
settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace,
ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit()));
}
return settings; return settings;
} }

View File

@ -95,6 +95,12 @@ class ThrottleSettings extends QuotaSettings {
case READ_SIZE: case READ_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit())); builder.append(sizeToString(timedQuota.getSoftLimit()));
break; break;
case REQUEST_CAPACITY_UNIT:
case READ_CAPACITY_UNIT:
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
break;
default:
} }
} else if (timedQuota.hasShare()) { } else if (timedQuota.hasShare()) {
builder.append(String.format("%.2f%%", timedQuota.getShare())); builder.append(String.format("%.2f%%", timedQuota.getShare()));

View File

@ -41,4 +41,13 @@ public enum ThrottleType {
/** Throttling based on the read data size */ /** Throttling based on the read data size */
READ_SIZE, READ_SIZE,
/** Throttling based on the read+write capacity unit */
REQUEST_CAPACITY_UNIT,
/** Throttling based on the write data capacity unit */
WRITE_CAPACITY_UNIT,
/** Throttling based on the read data capacity unit */
READ_CAPACITY_UNIT,
} }

View File

@ -2389,15 +2389,28 @@ public final class ProtobufUtil {
*/ */
public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) { public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) {
switch (proto) { switch (proto) {
case REQUEST_NUMBER: return ThrottleType.REQUEST_NUMBER; case REQUEST_NUMBER:
case REQUEST_SIZE: return ThrottleType.REQUEST_SIZE; return ThrottleType.REQUEST_NUMBER;
case WRITE_NUMBER: return ThrottleType.WRITE_NUMBER; case REQUEST_SIZE:
case WRITE_SIZE: return ThrottleType.WRITE_SIZE; return ThrottleType.REQUEST_SIZE;
case READ_NUMBER: return ThrottleType.READ_NUMBER; case REQUEST_CAPACITY_UNIT:
case READ_SIZE: return ThrottleType.READ_SIZE; return ThrottleType.REQUEST_CAPACITY_UNIT;
} case WRITE_NUMBER:
return ThrottleType.WRITE_NUMBER;
case WRITE_SIZE:
return ThrottleType.WRITE_SIZE;
case READ_NUMBER:
return ThrottleType.READ_NUMBER;
case READ_SIZE:
return ThrottleType.READ_SIZE;
case READ_CAPACITY_UNIT:
return ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return ThrottleType.WRITE_CAPACITY_UNIT;
default:
throw new RuntimeException("Invalid ThrottleType " + proto); throw new RuntimeException("Invalid ThrottleType " + proto);
} }
}
/** /**
* Convert a client ThrottleType to a protocol buffer ThrottleType * Convert a client ThrottleType to a protocol buffer ThrottleType
@ -2407,15 +2420,28 @@ public final class ProtobufUtil {
*/ */
public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) { public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) {
switch (type) { switch (type) {
case REQUEST_NUMBER: return QuotaProtos.ThrottleType.REQUEST_NUMBER; case REQUEST_NUMBER:
case REQUEST_SIZE: return QuotaProtos.ThrottleType.REQUEST_SIZE; return QuotaProtos.ThrottleType.REQUEST_NUMBER;
case WRITE_NUMBER: return QuotaProtos.ThrottleType.WRITE_NUMBER; case REQUEST_SIZE:
case WRITE_SIZE: return QuotaProtos.ThrottleType.WRITE_SIZE; return QuotaProtos.ThrottleType.REQUEST_SIZE;
case READ_NUMBER: return QuotaProtos.ThrottleType.READ_NUMBER; case WRITE_NUMBER:
case READ_SIZE: return QuotaProtos.ThrottleType.READ_SIZE; return QuotaProtos.ThrottleType.WRITE_NUMBER;
} case WRITE_SIZE:
return QuotaProtos.ThrottleType.WRITE_SIZE;
case READ_NUMBER:
return QuotaProtos.ThrottleType.READ_NUMBER;
case READ_SIZE:
return QuotaProtos.ThrottleType.READ_SIZE;
case REQUEST_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT;
case READ_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT;
case WRITE_CAPACITY_UNIT:
return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT;
default:
throw new RuntimeException("Invalid ThrottleType " + type); throw new RuntimeException("Invalid ThrottleType " + type);
} }
}
/** /**
* Convert a protocol buffer QuotaScope to a client QuotaScope * Convert a protocol buffer QuotaScope to a client QuotaScope

View File

@ -46,6 +46,9 @@ enum ThrottleType {
WRITE_SIZE = 4; WRITE_SIZE = 4;
READ_NUMBER = 5; READ_NUMBER = 5;
READ_SIZE = 6; READ_SIZE = 6;
REQUEST_CAPACITY_UNIT = 7;
WRITE_CAPACITY_UNIT = 8;
READ_CAPACITY_UNIT = 9;
} }
message Throttle { message Throttle {
@ -57,6 +60,10 @@ message Throttle {
optional TimedQuota read_num = 5; optional TimedQuota read_num = 5;
optional TimedQuota read_size = 6; optional TimedQuota read_size = 6;
optional TimedQuota req_capacity_unit = 7;
optional TimedQuota write_capacity_unit = 8;
optional TimedQuota read_capacity_unit = 9;
} }
message ThrottleRequest { message ThrottleRequest {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -34,20 +35,29 @@ public class DefaultOperationQuota implements OperationQuota {
private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class);
private final List<QuotaLimiter> limiters; private final List<QuotaLimiter> limiters;
private final long writeCapacityUnit;
private final long readCapacityUnit;
private long writeAvailable = 0; private long writeAvailable = 0;
private long readAvailable = 0; private long readAvailable = 0;
private long writeConsumed = 0; private long writeConsumed = 0;
private long readConsumed = 0; private long readConsumed = 0;
private long writeCapacityUnitConsumed = 0;
private long readCapacityUnitConsumed = 0;
private final long[] operationSize; private final long[] operationSize;
public DefaultOperationQuota(final QuotaLimiter... limiters) { public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) {
this(Arrays.asList(limiters)); this(conf, Arrays.asList(limiters));
} }
/** /**
* NOTE: The order matters. It should be something like [user, table, namespace, global] * NOTE: The order matters. It should be something like [user, table, namespace, global]
*/ */
public DefaultOperationQuota(final List<QuotaLimiter> limiters) { public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) {
this.writeCapacityUnit =
conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT);
this.readCapacityUnit =
conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT);
this.limiters = limiters; this.limiters = limiters;
int size = OperationType.values().length; int size = OperationType.values().length;
operationSize = new long[size]; operationSize = new long[size];
@ -58,24 +68,28 @@ public class DefaultOperationQuota implements OperationQuota {
} }
@Override @Override
public void checkQuota(int numWrites, int numReads, int numScans) public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException {
throws RpcThrottlingException {
writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
readConsumed = estimateConsume(OperationType.GET, numReads, 100); readConsumed = estimateConsume(OperationType.GET, numReads, 100);
readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
writeAvailable = Long.MAX_VALUE; writeAvailable = Long.MAX_VALUE;
readAvailable = Long.MAX_VALUE; readAvailable = Long.MAX_VALUE;
for (final QuotaLimiter limiter : limiters) { for (final QuotaLimiter limiter : limiters) {
if (limiter.isBypass()) continue; if (limiter.isBypass()) continue;
limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
} }
for (final QuotaLimiter limiter : limiters) { for (final QuotaLimiter limiter : limiters) {
limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed,
writeCapacityUnitConsumed, readCapacityUnitConsumed);
} }
} }
@ -83,12 +97,21 @@ public class DefaultOperationQuota implements OperationQuota {
public void close() { public void close() {
// Adjust the quota consumed for the specified operation // Adjust the quota consumed for the specified operation
long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
long readDiff = operationSize[OperationType.GET.ordinal()] + long readDiff = operationSize[OperationType.GET.ordinal()]
operationSize[OperationType.SCAN.ordinal()] - readConsumed; + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(
operationSize[OperationType.MUTATE.ordinal()], writeConsumed);
long readCapacityUnitDiff = calculateReadCapacityUnitDiff(
operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()],
readConsumed);
for (final QuotaLimiter limiter : limiters) { for (final QuotaLimiter limiter : limiters) {
if (writeDiff != 0) limiter.consumeWrite(writeDiff); if (writeDiff != 0) {
if (readDiff != 0) limiter.consumeRead(readDiff); limiter.consumeWrite(writeDiff, writeCapacityUnitDiff);
}
if (readDiff != 0) {
limiter.consumeRead(readDiff, readCapacityUnitDiff);
}
} }
} }
@ -123,4 +146,20 @@ public class DefaultOperationQuota implements OperationQuota {
} }
return 0; return 0;
} }
private long calculateWriteCapacityUnit(final long size) {
return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit);
}
private long calculateReadCapacityUnit(final long size) {
return (long) Math.ceil(size * 1.0 / this.readCapacityUnit);
}
private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) {
return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize);
}
private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) {
return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize);
}
} }

View File

@ -149,6 +149,16 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
case READ_SIZE: case READ_SIZE:
throttleBuilder.setReadSize(otherProto.getTimedQuota()); throttleBuilder.setReadSize(otherProto.getTimedQuota());
break; break;
case REQUEST_CAPACITY_UNIT:
throttleBuilder.setReqCapacityUnit(otherProto.getTimedQuota());
break;
case READ_CAPACITY_UNIT:
throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota());
break;
case WRITE_CAPACITY_UNIT:
throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota());
break;
default:
} }
} }
} }
@ -232,6 +242,11 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
case READ_SIZE: case READ_SIZE:
builder.append(sizeToString(timedQuota.getSoftLimit())); builder.append(sizeToString(timedQuota.getSoftLimit()));
break; break;
case REQUEST_CAPACITY_UNIT:
case READ_CAPACITY_UNIT:
case WRITE_CAPACITY_UNIT:
builder.append(String.format("%dCU", timedQuota.getSoftLimit()));
default:
} }
} else if (timedQuota.hasShare()) { } else if (timedQuota.hasShare()) {
builder.append(String.format("%.2f%%", timedQuota.getShare())); builder.append(String.format("%.2f%%", timedQuota.getShare()));
@ -289,6 +304,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
if (proto.hasWriteSize()) { if (proto.hasWriteSize()) {
quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize()); quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize());
} }
if (proto.hasReqCapacityUnit()) {
quotas.put(ThrottleType.REQUEST_CAPACITY_UNIT, proto.getReqCapacityUnit());
}
if (proto.hasReadCapacityUnit()) {
quotas.put(ThrottleType.READ_CAPACITY_UNIT, proto.getReqCapacityUnit());
}
if (proto.hasWriteCapacityUnit()) {
quotas.put(ThrottleType.WRITE_CAPACITY_UNIT, proto.getWriteCapacityUnit());
}
return quotas; return quotas;
} }
@ -299,5 +323,8 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings {
builder.clearReqSize(); builder.clearReqSize();
builder.clearWriteNum(); builder.clearWriteNum();
builder.clearWriteSize(); builder.clearWriteSize();
builder.clearReadCapacityUnit();
builder.clearReadCapacityUnit();
builder.clearWriteCapacityUnit();
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/** /**
* Noop quota limiter returned when no limiter is associated to the user/table * Noop quota limiter returned when no limiter is associated to the user/table
@ -36,22 +35,24 @@ class NoopQuotaLimiter implements QuotaLimiter {
@Override @Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize) throws RpcThrottlingException { long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
// no-op // no-op
} }
@Override @Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit) {
// no-op // no-op
} }
@Override @Override
public void consumeWrite(final long size) { public void consumeWrite(final long size, long capacityUnit) {
// no-op // no-op
} }
@Override @Override
public void consumeRead(final long size) { public void consumeRead(final long size, long capacityUnit) {
// no-op // no-op
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/** /**
* Internal interface used to interact with the user/table quota. * Internal interface used to interact with the user/table quota.
@ -35,10 +34,14 @@ public interface QuotaLimiter {
* @param estimateWriteSize the write size that will be checked against the available quota * @param estimateWriteSize the write size that will be checked against the available quota
* @param readReqs the read requests that will be checked against the available quota * @param readReqs the read requests that will be checked against the available quota
* @param estimateReadSize the read size that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota
* @param estimateWriteCapacityUnit the write capacity unit that will be checked against the
* available quota
* @param estimateReadCapacityUnit the read capacity unit that will be checked against the
* available quota
* @throws RpcThrottlingException thrown if not enough available resources to perform operation. * @throws RpcThrottlingException thrown if not enough available resources to perform operation.
*/ */
void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize,
throws RpcThrottlingException; long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException;
/** /**
* Removes the specified write and read amount from the quota. * Removes the specified write and read amount from the quota.
@ -49,20 +52,23 @@ public interface QuotaLimiter {
* @param writeSize the write size that will be removed from the current quota * @param writeSize the write size that will be removed from the current quota
* @param readReqs the read requests that will be removed from the current quota * @param readReqs the read requests that will be removed from the current quota
* @param readSize the read size that will be removed from the current quota * @param readSize the read size that will be removed from the current quota
* @param writeCapacityUnit the write capacity unit that will be removed from the current quota
* @param readCapacityUnit the read capacity unit num that will be removed from the current quota
*/ */
void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit);
/** /**
* Removes or add back some write amount to the quota. * Removes or add back some write amount to the quota.
* (called at the end of an operation in case the estimate quota was off) * (called at the end of an operation in case the estimate quota was off)
*/ */
void consumeWrite(long size); void consumeWrite(long size, long capacityUnit);
/** /**
* Removes or add back some read amount to the quota. * Removes or add back some read amount to the quota.
* (called at the end of an operation in case the estimate quota was off) * (called at the end of an operation in case the estimate quota was off)
*/ */
void consumeRead(long size); void consumeRead(long size, long capacityUnit);
/** @return true if the limiter is a noop */ /** @return true if the limiter is a noop */
boolean isBypass(); boolean isBypass();

View File

@ -57,6 +57,13 @@ public class QuotaUtil extends QuotaTableUtil {
public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
private static final boolean QUOTA_ENABLED_DEFAULT = false; private static final boolean QUOTA_ENABLED_DEFAULT = false;
public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit";
// the default one read capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_READ_CAPACITY_UNIT = 1024;
public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit";
// the default one write capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;
/** Table descriptor for Quota internal table */ /** Table descriptor for Quota internal table */
public static final HTableDescriptor QUOTA_TABLE_DESC = public static final HTableDescriptor QUOTA_TABLE_DESC =
new HTableDescriptor(QUOTA_TABLE_NAME); new HTableDescriptor(QUOTA_TABLE_NAME);

View File

@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager {
LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
} }
if (!useNoop) { if (!useNoop) {
return new DefaultOperationQuota(userLimiter); return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter);
} }
} else { } else {
QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@ -113,7 +113,8 @@ public class RegionServerRpcQuotaManager {
userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
} }
if (!useNoop) { if (!useNoop) {
return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
tableLimiter, nsLimiter);
} }
} }
} }

View File

@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter writeSizeLimiter = null; private RateLimiter writeSizeLimiter = null;
private RateLimiter readReqsLimiter = null; private RateLimiter readReqsLimiter = null;
private RateLimiter readSizeLimiter = null; private RateLimiter readSizeLimiter = null;
private RateLimiter reqCapacityUnitLimiter = null;
private RateLimiter writeCapacityUnitLimiter = null;
private RateLimiter readCapacityUnitLimiter = null;
private TimeBasedLimiter() { private TimeBasedLimiter() {
if (FixedIntervalRateLimiter.class.getName().equals( if (FixedIntervalRateLimiter.class.getName().equals(
@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter = new FixedIntervalRateLimiter(); writeSizeLimiter = new FixedIntervalRateLimiter();
readReqsLimiter = new FixedIntervalRateLimiter(); readReqsLimiter = new FixedIntervalRateLimiter();
readSizeLimiter = new FixedIntervalRateLimiter(); readSizeLimiter = new FixedIntervalRateLimiter();
reqCapacityUnitLimiter = new FixedIntervalRateLimiter();
writeCapacityUnitLimiter = new FixedIntervalRateLimiter();
readCapacityUnitLimiter = new FixedIntervalRateLimiter();
} else { } else {
reqsLimiter = new AverageIntervalRateLimiter(); reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter();
@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter = new AverageIntervalRateLimiter(); writeSizeLimiter = new AverageIntervalRateLimiter();
readReqsLimiter = new AverageIntervalRateLimiter(); readReqsLimiter = new AverageIntervalRateLimiter();
readSizeLimiter = new AverageIntervalRateLimiter(); readSizeLimiter = new AverageIntervalRateLimiter();
reqCapacityUnitLimiter = new AverageIntervalRateLimiter();
writeCapacityUnitLimiter = new AverageIntervalRateLimiter();
readCapacityUnitLimiter = new AverageIntervalRateLimiter();
} }
} }
@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter {
setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize()); setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
isBypass = false; isBypass = false;
} }
if (throttle.hasReqCapacityUnit()) {
setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit());
isBypass = false;
}
if (throttle.hasWriteCapacityUnit()) {
setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit());
isBypass = false;
}
if (throttle.hasReadCapacityUnit()) {
setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit());
isBypass = false;
}
return isBypass ? NoopQuotaLimiter.get() : limiter; return isBypass ? NoopQuotaLimiter.get() : limiter;
} }
@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter {
writeSizeLimiter.update(other.writeSizeLimiter); writeSizeLimiter.update(other.writeSizeLimiter);
readReqsLimiter.update(other.readReqsLimiter); readReqsLimiter.update(other.readReqsLimiter);
readSizeLimiter.update(other.readSizeLimiter); readSizeLimiter.update(other.readSizeLimiter);
reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter);
writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter);
readCapacityUnitLimiter.update(other.readCapacityUnitLimiter);
} }
private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter {
@Override @Override
public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
long estimateReadSize) throws RpcThrottlingException { long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit)
throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) { if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
} }
@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwRequestSizeExceeded( RpcThrottlingException.throwRequestSizeExceeded(
reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
} }
if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) {
RpcThrottlingException.throwRequestSizeExceeded(
reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit));
}
if (estimateWriteSize > 0) { if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute(writeReqs)) { if (!writeReqsLimiter.canExecute(writeReqs)) {
@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwWriteSizeExceeded( RpcThrottlingException.throwWriteSizeExceeded(
writeSizeLimiter.waitInterval(estimateWriteSize)); writeSizeLimiter.waitInterval(estimateWriteSize));
} }
if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) {
RpcThrottlingException.throwWriteSizeExceeded(
writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit));
}
} }
if (estimateReadSize > 0) { if (estimateReadSize > 0) {
@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
RpcThrottlingException.throwReadSizeExceeded( RpcThrottlingException.throwReadSizeExceeded(
readSizeLimiter.waitInterval(estimateReadSize)); readSizeLimiter.waitInterval(estimateReadSize));
} }
if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) {
RpcThrottlingException
.throwWriteSizeExceeded(readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit));
}
} }
} }
@Override @Override
public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize,
long writeCapacityUnit, long readCapacityUnit) {
assert writeSize != 0 || readSize != 0; assert writeSize != 0 || readSize != 0;
reqsLimiter.consume(writeReqs + readReqs); reqsLimiter.consume(writeReqs + readReqs);
@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter {
readReqsLimiter.consume(readReqs); readReqsLimiter.consume(readReqs);
readSizeLimiter.consume(readSize); readSizeLimiter.consume(readSize);
} }
if (writeCapacityUnit > 0) {
reqCapacityUnitLimiter.consume(writeCapacityUnit);
writeCapacityUnitLimiter.consume(writeCapacityUnit);
}
if (readCapacityUnit > 0) {
reqCapacityUnitLimiter.consume(readCapacityUnit);
readCapacityUnitLimiter.consume(readCapacityUnit);
}
} }
@Override @Override
public void consumeWrite(final long size) { public void consumeWrite(final long size, long capacityUnit) {
reqSizeLimiter.consume(size); reqSizeLimiter.consume(size);
writeSizeLimiter.consume(size); writeSizeLimiter.consume(size);
reqCapacityUnitLimiter.consume(capacityUnit);
writeCapacityUnitLimiter.consume(capacityUnit);
} }
@Override @Override
public void consumeRead(final long size) { public void consumeRead(final long size, long capacityUnit) {
reqSizeLimiter.consume(size); reqSizeLimiter.consume(size);
readSizeLimiter.consume(size); readSizeLimiter.consume(size);
reqCapacityUnitLimiter.consume(capacityUnit);
readCapacityUnitLimiter.consume(capacityUnit);
} }
@Override @Override
@ -189,12 +242,33 @@ public class TimeBasedLimiter implements QuotaLimiter {
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append("TimeBasedLimiter("); builder.append("TimeBasedLimiter(");
if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter); if (!reqsLimiter.isBypass()) {
if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter); builder.append("reqs=" + reqsLimiter);
if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter); }
if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter); if (!reqSizeLimiter.isBypass()) {
if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter); builder.append(" resSize=" + reqSizeLimiter);
if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter); }
if (!writeReqsLimiter.isBypass()) {
builder.append(" writeReqs=" + writeReqsLimiter);
}
if (!writeSizeLimiter.isBypass()) {
builder.append(" writeSize=" + writeSizeLimiter);
}
if (!readReqsLimiter.isBypass()) {
builder.append(" readReqs=" + readReqsLimiter);
}
if (!readSizeLimiter.isBypass()) {
builder.append(" readSize=" + readSizeLimiter);
}
if (!reqCapacityUnitLimiter.isBypass()) {
builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter);
}
if (!writeCapacityUnitLimiter.isBypass()) {
builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter);
}
if (!readCapacityUnitLimiter.isBypass()) {
builder.append(" readCapacityUnit=" + readCapacityUnitLimiter);
}
builder.append(')'); builder.append(')');
return builder.toString(); return builder.toString();
} }

View File

@ -455,17 +455,22 @@ public class TestQuotaAdmin {
@Test @Test
public void testSetGetRemoveRPCQuota() throws Exception { public void testSetGetRemoveRPCQuota() throws Exception {
testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE);
testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT);
}
private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception {
Admin admin = TEST_UTIL.getAdmin(); Admin admin = TEST_UTIL.getAdmin();
final TableName tn = TableName.valueOf("sq_table1"); final TableName tn = TableName.valueOf("sq_table1");
QuotaSettings settings = QuotaSettings settings =
QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS);
admin.setQuota(settings); admin.setQuota(settings);
// Verify the Quota in the table // Verify the Quota in the table
verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS);
// Verify we can retrieve it via the QuotaRetriever API // Verify we can retrieve it via the QuotaRetriever API
verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS);
// Now, remove the quota // Now, remove the quota
QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn); QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn);
@ -584,6 +589,19 @@ public class TestQuotaAdmin {
assertTrue(rpcQuota.hasWriteSize()); assertTrue(rpcQuota.hasWriteSize());
t = rpcQuota.getWriteSize(); t = rpcQuota.getWriteSize();
break; break;
case REQUEST_CAPACITY_UNIT:
assertTrue(rpcQuota.hasReqCapacityUnit());
t = rpcQuota.getReqCapacityUnit();
break;
case READ_CAPACITY_UNIT:
assertTrue(rpcQuota.hasReadCapacityUnit());
t = rpcQuota.getReadCapacityUnit();
break;
case WRITE_CAPACITY_UNIT:
assertTrue(rpcQuota.hasWriteCapacityUnit());
t = rpcQuota.getWriteCapacityUnit();
break;
default:
} }
assertEquals(t.getSoftLimit(), limit); assertEquals(t.getSoftLimit(), limit);

View File

@ -224,7 +224,7 @@ public class TestQuotaState {
assertFalse(quotaInfo.isBypass()); assertFalse(quotaInfo.isBypass());
QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
try { try {
limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0);
fail("Should have thrown RpcThrottlingException"); fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) { } catch (RpcThrottlingException e) {
// expected // expected
@ -242,7 +242,7 @@ public class TestQuotaState {
private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
assertNoThrottleException(limiter, availReqs); assertNoThrottleException(limiter, availReqs);
try { try {
limiter.checkQuota(1, 1, 0, 0); limiter.checkQuota(1, 1, 0, 0, 1, 0);
fail("Should have thrown RpcThrottlingException"); fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) { } catch (RpcThrottlingException e) {
// expected // expected
@ -252,11 +252,11 @@ public class TestQuotaState {
private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
for (int i = 0; i < availReqs; ++i) { for (int i = 0; i < availReqs; ++i) {
try { try {
limiter.checkQuota(1, 1, 0, 0); limiter.checkQuota(1, 1, 0, 0, 1, 0);
} catch (RpcThrottlingException e) { } catch (RpcThrottlingException e) {
fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
} }
limiter.grabQuota(1, 1, 0, 0); limiter.grabQuota(1, 1, 0, 0, 1, 0);
} }
} }

View File

@ -509,12 +509,66 @@ public class TestQuotaThrottle {
assertEquals(30, doGets(30, tables[1])); assertEquals(30, doGets(30, tables[1]));
} }
@Test
public void testTableWriteCapacityUnitThrottle() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
// Add 6CU/min limit
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
// should execute at max 6 capacity units because each put size is 1 capacity unit
assertEquals(6, doPuts(20, 10, tables[0]));
// wait a minute and you should execute at max 3 capacity units because each put size is 2
// capacity unit
waitMinuteQuota();
assertEquals(3, doPuts(20, 1025, tables[0]));
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
}
@Test
public void testTableReadCapacityUnitThrottle() throws Exception {
final Admin admin = TEST_UTIL.getAdmin();
// Add 6CU/min limit
admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0],
ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES));
triggerTableCacheRefresh(false, TABLE_NAMES[0]);
assertEquals(20, doPuts(20, 10, tables[0]));
// should execute at max 6 capacity units because each get size is 1 capacity unit
assertEquals(6, doGets(20, tables[0]));
assertEquals(20, doPuts(20, 2015, tables[0]));
// wait a minute and you should execute at max 3 capacity units because each get size is 2
// capacity unit on tables[0]
waitMinuteQuota();
assertEquals(3, doGets(20, tables[0]));
admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0]));
triggerTableCacheRefresh(true, TABLE_NAMES[0]);
}
private int doPuts(int maxOps, final Table... tables) throws Exception { private int doPuts(int maxOps, final Table... tables) throws Exception {
return doPuts(maxOps, -1, tables);
}
private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception {
int count = 0; int count = 0;
try { try {
while (count < maxOps) { while (count < maxOps) {
Put put = new Put(Bytes.toBytes("row-" + count)); Put put = new Put(Bytes.toBytes("row-" + count));
put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count)); byte[] value;
if (valueSize < 0) {
value = Bytes.toBytes("data-" + count);
} else {
value = generateValue(valueSize);
}
put.addColumn(FAMILY, QUALIFIER, value);
for (final Table table : tables) { for (final Table table : tables) {
table.put(put); table.put(put);
} }
@ -526,6 +580,14 @@ public class TestQuotaThrottle {
return count; return count;
} }
private byte[] generateValue(int valueSize) {
byte[] bytes = new byte[valueSize];
for (int i = 0; i < valueSize; i++) {
bytes[i] = 'a';
}
return bytes;
}
private long doGets(int maxOps, final Table... tables) throws Exception { private long doGets(int maxOps, final Table... tables) throws Exception {
int count = 0; int count = 0;
try { try {

View File

@ -263,11 +263,14 @@ module Hbase
def _parse_limit(str_limit, type_cls, type) def _parse_limit(str_limit, type_cls, type)
str_limit = str_limit.downcase str_limit = str_limit.downcase
match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit) match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit)
if match if match
if match[2] == 'req' if match[2] == 'req'
limit = match[1].to_i limit = match[1].to_i
type = type_cls.valueOf(type + '_NUMBER') type = type_cls.valueOf(type + '_NUMBER')
elsif match[2] == 'cu'
limit = match[1].to_i
type = type_cls.valueOf(type + '_CAPACITY_UNIT')
else else
limit = _size_from_str(match[1].to_i, match[2]) limit = _size_from_str(match[1].to_i, match[2])
type = type_cls.valueOf(type + '_SIZE') type = type_cls.valueOf(type + '_SIZE')

View File

@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace.
Syntax : set_quota TYPE => <type>, <args> Syntax : set_quota TYPE => <type>, <args>
TYPE => THROTTLE TYPE => THROTTLE
User can either set quota on read, write or on both the requests together(i.e., read+write) User can either set quota on read, write or on both the requests together(i.e., read+write).
The read, write, or read+write(default throttle type) request limit can be expressed using The read, write, or read+write(default throttle type) request limit can be expressed using
the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit
can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit
and (sec, min, hour, day) as valid time unit. ; the read, write, read+write(default throttle type) limit can be expressed using the form
100CU/sec as capacity unit. The valid time units are (sec, min, hour, day).
Currently the throttle limit is per machine - a limit of 100req/min Currently the throttle limit is per machine - a limit of 100req/min
means that each machine can execute 100req/min. means that each machine can execute 100req/min.
@ -42,6 +43,9 @@ For example:
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec'
hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min' hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min'
hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE

View File

@ -136,5 +136,32 @@ module Hbase
assert(output.include? snapshot1) assert(output.include? snapshot1)
assert(output.include? snapshot2) assert(output.include? snapshot2)
end end
define_test 'can set and remove user CU quota' do
command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => '1CU/sec')
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('USER => user1'))
assert(output.include?('TYPE => THROTTLE'))
assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT'))
assert(output.include?('LIMIT => 1CU/sec'))
command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => NONE)
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('0 row(s)'))
end
define_test 'can set and remove table CU quota' do
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name,
THROTTLE_TYPE => WRITE, LIMIT => '2CU/min')
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('TABLE => hbase_shell_quota_tests_table'))
assert(output.include?('TYPE => THROTTLE'))
assert(output.include?('THROTTLE_TYPE => WRITE_CAPACITY_UNIT'))
assert(output.include?('LIMIT => 2CU/min'))
command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE)
output = capture_stdout{ command(:list_quotas) }
assert(output.include?('0 row(s)'))
end
end end
end end