diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java index 2a20c51e672..14d1ad31b3d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java @@ -143,6 +143,18 @@ public class QuotaSettingsFactory { settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, ThrottleType.READ_SIZE, throttle.getReadSize())); } + if (throttle.hasReqCapacityUnit()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, + ThrottleType.REQUEST_CAPACITY_UNIT, throttle.getReqCapacityUnit())); + } + if (throttle.hasReadCapacityUnit()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, + ThrottleType.READ_CAPACITY_UNIT, throttle.getReadCapacityUnit())); + } + if (throttle.hasWriteCapacityUnit()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, + ThrottleType.WRITE_CAPACITY_UNIT, throttle.getWriteCapacityUnit())); + } return settings; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java index e424d8a0b1c..05fb70bfd50 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -95,6 +95,12 @@ class ThrottleSettings extends QuotaSettings { case READ_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; + case REQUEST_CAPACITY_UNIT: + case READ_CAPACITY_UNIT: + case WRITE_CAPACITY_UNIT: + builder.append(String.format("%dCU", timedQuota.getSoftLimit())); + break; + default: } } else if (timedQuota.hasShare()) { builder.append(String.format("%.2f%%", timedQuota.getShare())); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java index 0b0ee609ebf..ec5b32dbf2a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -41,4 +41,13 @@ public enum ThrottleType { /** Throttling based on the read data size */ READ_SIZE, + + /** Throttling based on the read+write capacity unit */ + REQUEST_CAPACITY_UNIT, + + /** Throttling based on the write data capacity unit */ + WRITE_CAPACITY_UNIT, + + /** Throttling based on the read data capacity unit */ + READ_CAPACITY_UNIT, } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 87eff009ef9..ee60d87af0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2389,14 +2389,27 @@ public final class ProtobufUtil { */ public static ThrottleType toThrottleType(final QuotaProtos.ThrottleType proto) { switch (proto) { - case REQUEST_NUMBER: return ThrottleType.REQUEST_NUMBER; - case REQUEST_SIZE: return ThrottleType.REQUEST_SIZE; - case WRITE_NUMBER: return ThrottleType.WRITE_NUMBER; - case WRITE_SIZE: return ThrottleType.WRITE_SIZE; - case READ_NUMBER: return ThrottleType.READ_NUMBER; - case READ_SIZE: return ThrottleType.READ_SIZE; + case REQUEST_NUMBER: + return ThrottleType.REQUEST_NUMBER; + case REQUEST_SIZE: + return ThrottleType.REQUEST_SIZE; + case REQUEST_CAPACITY_UNIT: + return ThrottleType.REQUEST_CAPACITY_UNIT; + case WRITE_NUMBER: + return ThrottleType.WRITE_NUMBER; + case WRITE_SIZE: + return ThrottleType.WRITE_SIZE; + case READ_NUMBER: + return ThrottleType.READ_NUMBER; + case READ_SIZE: + return ThrottleType.READ_SIZE; + case READ_CAPACITY_UNIT: + return ThrottleType.READ_CAPACITY_UNIT; + case WRITE_CAPACITY_UNIT: + return ThrottleType.WRITE_CAPACITY_UNIT; + default: + throw new RuntimeException("Invalid ThrottleType " + proto); } - throw new RuntimeException("Invalid ThrottleType " + proto); } /** @@ -2407,14 +2420,27 @@ public final class ProtobufUtil { */ public static QuotaProtos.ThrottleType toProtoThrottleType(final ThrottleType type) { switch (type) { - case REQUEST_NUMBER: return QuotaProtos.ThrottleType.REQUEST_NUMBER; - case REQUEST_SIZE: return QuotaProtos.ThrottleType.REQUEST_SIZE; - case WRITE_NUMBER: return QuotaProtos.ThrottleType.WRITE_NUMBER; - case WRITE_SIZE: return QuotaProtos.ThrottleType.WRITE_SIZE; - case READ_NUMBER: return QuotaProtos.ThrottleType.READ_NUMBER; - case READ_SIZE: return QuotaProtos.ThrottleType.READ_SIZE; + case REQUEST_NUMBER: + return QuotaProtos.ThrottleType.REQUEST_NUMBER; + case REQUEST_SIZE: + return QuotaProtos.ThrottleType.REQUEST_SIZE; + case WRITE_NUMBER: + return QuotaProtos.ThrottleType.WRITE_NUMBER; + case WRITE_SIZE: + return QuotaProtos.ThrottleType.WRITE_SIZE; + case READ_NUMBER: + return QuotaProtos.ThrottleType.READ_NUMBER; + case READ_SIZE: + return QuotaProtos.ThrottleType.READ_SIZE; + case REQUEST_CAPACITY_UNIT: + return QuotaProtos.ThrottleType.REQUEST_CAPACITY_UNIT; + case READ_CAPACITY_UNIT: + return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT; + case WRITE_CAPACITY_UNIT: + return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT; + default: + throw new RuntimeException("Invalid ThrottleType " + type); } - throw new RuntimeException("Invalid ThrottleType " + type); } /** diff --git a/hbase-protocol-shaded/src/main/protobuf/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/Quota.proto index cd4c7df0b77..5b00d74980b 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Quota.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Quota.proto @@ -46,6 +46,9 @@ enum ThrottleType { WRITE_SIZE = 4; READ_NUMBER = 5; READ_SIZE = 6; + REQUEST_CAPACITY_UNIT = 7; + WRITE_CAPACITY_UNIT = 8; + READ_CAPACITY_UNIT = 9; } message Throttle { @@ -57,6 +60,10 @@ message Throttle { optional TimedQuota read_num = 5; optional TimedQuota read_size = 6; + + optional TimedQuota req_capacity_unit = 7; + optional TimedQuota write_capacity_unit = 8; + optional TimedQuota read_capacity_unit = 9; } message ThrottleRequest { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 1265a4223a6..f9b3ca5c29b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.quotas; import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -34,20 +35,29 @@ public class DefaultOperationQuota implements OperationQuota { private static final Logger LOG = LoggerFactory.getLogger(DefaultOperationQuota.class); private final List limiters; + private final long writeCapacityUnit; + private final long readCapacityUnit; + private long writeAvailable = 0; private long readAvailable = 0; private long writeConsumed = 0; private long readConsumed = 0; + private long writeCapacityUnitConsumed = 0; + private long readCapacityUnitConsumed = 0; private final long[] operationSize; - public DefaultOperationQuota(final QuotaLimiter... limiters) { - this(Arrays.asList(limiters)); + public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... limiters) { + this(conf, Arrays.asList(limiters)); } /** * NOTE: The order matters. It should be something like [user, table, namespace, global] */ - public DefaultOperationQuota(final List limiters) { + public DefaultOperationQuota(final Configuration conf, final List limiters) { + this.writeCapacityUnit = + conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); + this.readCapacityUnit = + conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); this.limiters = limiters; int size = OperationType.values().length; operationSize = new long[size]; @@ -58,24 +68,28 @@ public class DefaultOperationQuota implements OperationQuota { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) - throws RpcThrottlingException { + public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); - readConsumed = estimateConsume(OperationType.GET, numReads, 100); + readConsumed = estimateConsume(OperationType.GET, numReads, 100); readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); + writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); + readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + writeAvailable = Long.MAX_VALUE; readAvailable = Long.MAX_VALUE; - for (final QuotaLimiter limiter: limiters) { + for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } - for (final QuotaLimiter limiter: limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); + for (final QuotaLimiter limiter : limiters) { + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + writeCapacityUnitConsumed, readCapacityUnitConsumed); } } @@ -83,12 +97,21 @@ public class DefaultOperationQuota implements OperationQuota { public void close() { // Adjust the quota consumed for the specified operation long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; - long readDiff = operationSize[OperationType.GET.ordinal()] + - operationSize[OperationType.SCAN.ordinal()] - readConsumed; + long readDiff = operationSize[OperationType.GET.ordinal()] + + operationSize[OperationType.SCAN.ordinal()] - readConsumed; + long writeCapacityUnitDiff = calculateWriteCapacityUnitDiff( + operationSize[OperationType.MUTATE.ordinal()], writeConsumed); + long readCapacityUnitDiff = calculateReadCapacityUnitDiff( + operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], + readConsumed); - for (final QuotaLimiter limiter: limiters) { - if (writeDiff != 0) limiter.consumeWrite(writeDiff); - if (readDiff != 0) limiter.consumeRead(readDiff); + for (final QuotaLimiter limiter : limiters) { + if (writeDiff != 0) { + limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); + } + if (readDiff != 0) { + limiter.consumeRead(readDiff, readCapacityUnitDiff); + } } } @@ -123,4 +146,20 @@ public class DefaultOperationQuota implements OperationQuota { } return 0; } + + private long calculateWriteCapacityUnit(final long size) { + return (long) Math.ceil(size * 1.0 / this.writeCapacityUnit); + } + + private long calculateReadCapacityUnit(final long size) { + return (long) Math.ceil(size * 1.0 / this.readCapacityUnit); + } + + private long calculateWriteCapacityUnitDiff(final long actualSize, final long estimateSize) { + return calculateWriteCapacityUnit(actualSize) - calculateWriteCapacityUnit(estimateSize); + } + + private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { + return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java index 311969154e4..0c6cb81b8fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java @@ -149,6 +149,16 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case READ_SIZE: throttleBuilder.setReadSize(otherProto.getTimedQuota()); break; + case REQUEST_CAPACITY_UNIT: + throttleBuilder.setReqCapacityUnit(otherProto.getTimedQuota()); + break; + case READ_CAPACITY_UNIT: + throttleBuilder.setReadCapacityUnit(otherProto.getTimedQuota()); + break; + case WRITE_CAPACITY_UNIT: + throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota()); + break; + default: } } } @@ -232,6 +242,11 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case READ_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; + case REQUEST_CAPACITY_UNIT: + case READ_CAPACITY_UNIT: + case WRITE_CAPACITY_UNIT: + builder.append(String.format("%dCU", timedQuota.getSoftLimit())); + default: } } else if (timedQuota.hasShare()) { builder.append(String.format("%.2f%%", timedQuota.getShare())); @@ -289,6 +304,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { if (proto.hasWriteSize()) { quotas.put(ThrottleType.WRITE_SIZE, proto.getWriteSize()); } + if (proto.hasReqCapacityUnit()) { + quotas.put(ThrottleType.REQUEST_CAPACITY_UNIT, proto.getReqCapacityUnit()); + } + if (proto.hasReadCapacityUnit()) { + quotas.put(ThrottleType.READ_CAPACITY_UNIT, proto.getReqCapacityUnit()); + } + if (proto.hasWriteCapacityUnit()) { + quotas.put(ThrottleType.WRITE_CAPACITY_UNIT, proto.getWriteCapacityUnit()); + } return quotas; } @@ -299,5 +323,8 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { builder.clearReqSize(); builder.clearWriteNum(); builder.clearWriteSize(); + builder.clearReadCapacityUnit(); + builder.clearReadCapacityUnit(); + builder.clearWriteCapacityUnit(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 3cca9559c13..71dd3c75822 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; /** * Noop quota limiter returned when no limiter is associated to the user/table @@ -36,22 +35,24 @@ class NoopQuotaLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize) throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) + throws RpcThrottlingException { // no-op } @Override - public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit) { // no-op } @Override - public void consumeWrite(final long size) { + public void consumeWrite(final long size, long capacityUnit) { // no-op } @Override - public void consumeRead(final long size) { + public void consumeRead(final long size, long capacityUnit) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 7cb29b31547..9260ec2faab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.quotas; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; /** * Internal interface used to interact with the user/table quota. @@ -35,10 +34,14 @@ public interface QuotaLimiter { * @param estimateWriteSize the write size that will be checked against the available quota * @param readReqs the read requests that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota + * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the + * available quota + * @param estimateReadCapacityUnit the read capacity unit that will be checked against the + * available quota * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ - void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) - throws RpcThrottlingException; + void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, + long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. @@ -49,20 +52,23 @@ public interface QuotaLimiter { * @param writeSize the write size that will be removed from the current quota * @param readReqs the read requests that will be removed from the current quota * @param readSize the read size that will be removed from the current quota + * @param writeCapacityUnit the write capacity unit that will be removed from the current quota + * @param readCapacityUnit the read capacity unit num that will be removed from the current quota */ - void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); + void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit); /** * Removes or add back some write amount to the quota. * (called at the end of an operation in case the estimate quota was off) */ - void consumeWrite(long size); + void consumeWrite(long size, long capacityUnit); /** * Removes or add back some read amount to the quota. * (called at the end of an operation in case the estimate quota was off) */ - void consumeRead(long size); + void consumeRead(long size, long capacityUnit); /** @return true if the limiter is a noop */ boolean isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 6bc3ce9d479..f6b5d95da3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -57,6 +57,13 @@ public class QuotaUtil extends QuotaTableUtil { public static final String QUOTA_CONF_KEY = "hbase.quota.enabled"; private static final boolean QUOTA_ENABLED_DEFAULT = false; + public static final String READ_CAPACITY_UNIT_CONF_KEY = "hbase.quota.read.capacity.unit"; + // the default one read capacity unit is 1024 bytes (1KB) + public static final long DEFAULT_READ_CAPACITY_UNIT = 1024; + public static final String WRITE_CAPACITY_UNIT_CONF_KEY = "hbase.quota.write.capacity.unit"; + // the default one write capacity unit is 1024 bytes (1KB) + public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024; + /** Table descriptor for Quota internal table */ public static final HTableDescriptor QUOTA_TABLE_DESC = new HTableDescriptor(QUOTA_TABLE_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 7c21f4561ea..40e70dcadc6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -102,7 +102,7 @@ public class RegionServerRpcQuotaManager { LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter); } if (!useNoop) { - return new DefaultOperationQuota(userLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter); } } else { QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); @@ -113,7 +113,8 @@ public class RegionServerRpcQuotaManager { userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter); } if (!useNoop) { - return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter); + return new DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter, + tableLimiter, nsLimiter); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 02dffcf2856..771eed1e6bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -40,6 +40,9 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter writeSizeLimiter = null; private RateLimiter readReqsLimiter = null; private RateLimiter readSizeLimiter = null; + private RateLimiter reqCapacityUnitLimiter = null; + private RateLimiter writeCapacityUnitLimiter = null; + private RateLimiter readCapacityUnitLimiter = null; private TimeBasedLimiter() { if (FixedIntervalRateLimiter.class.getName().equals( @@ -51,6 +54,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter = new FixedIntervalRateLimiter(); readReqsLimiter = new FixedIntervalRateLimiter(); readSizeLimiter = new FixedIntervalRateLimiter(); + reqCapacityUnitLimiter = new FixedIntervalRateLimiter(); + writeCapacityUnitLimiter = new FixedIntervalRateLimiter(); + readCapacityUnitLimiter = new FixedIntervalRateLimiter(); } else { reqsLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter(); @@ -58,6 +64,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter = new AverageIntervalRateLimiter(); readReqsLimiter = new AverageIntervalRateLimiter(); readSizeLimiter = new AverageIntervalRateLimiter(); + reqCapacityUnitLimiter = new AverageIntervalRateLimiter(); + writeCapacityUnitLimiter = new AverageIntervalRateLimiter(); + readCapacityUnitLimiter = new AverageIntervalRateLimiter(); } } @@ -93,6 +102,21 @@ public class TimeBasedLimiter implements QuotaLimiter { setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize()); isBypass = false; } + + if (throttle.hasReqCapacityUnit()) { + setFromTimedQuota(limiter.reqCapacityUnitLimiter, throttle.getReqCapacityUnit()); + isBypass = false; + } + + if (throttle.hasWriteCapacityUnit()) { + setFromTimedQuota(limiter.writeCapacityUnitLimiter, throttle.getWriteCapacityUnit()); + isBypass = false; + } + + if (throttle.hasReadCapacityUnit()) { + setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit()); + isBypass = false; + } return isBypass ? NoopQuotaLimiter.get() : limiter; } @@ -103,6 +127,9 @@ public class TimeBasedLimiter implements QuotaLimiter { writeSizeLimiter.update(other.writeSizeLimiter); readReqsLimiter.update(other.readReqsLimiter); readSizeLimiter.update(other.readSizeLimiter); + reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter); + writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter); + readCapacityUnitLimiter.update(other.readCapacityUnitLimiter); } private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { @@ -111,7 +138,8 @@ public class TimeBasedLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize) throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) + throws RpcThrottlingException { if (!reqsLimiter.canExecute(writeReqs + readReqs)) { RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } @@ -119,6 +147,10 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwRequestSizeExceeded( reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); } + if (!reqCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit + estimateReadCapacityUnit)) { + RpcThrottlingException.throwRequestSizeExceeded( + reqCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit + estimateReadCapacityUnit)); + } if (estimateWriteSize > 0) { if (!writeReqsLimiter.canExecute(writeReqs)) { @@ -128,6 +160,10 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwWriteSizeExceeded( writeSizeLimiter.waitInterval(estimateWriteSize)); } + if (!writeCapacityUnitLimiter.canExecute(estimateWriteCapacityUnit)) { + RpcThrottlingException.throwWriteSizeExceeded( + writeCapacityUnitLimiter.waitInterval(estimateWriteCapacityUnit)); + } } if (estimateReadSize > 0) { @@ -138,11 +174,16 @@ public class TimeBasedLimiter implements QuotaLimiter { RpcThrottlingException.throwReadSizeExceeded( readSizeLimiter.waitInterval(estimateReadSize)); } + if (!readCapacityUnitLimiter.canExecute(estimateReadCapacityUnit)) { + RpcThrottlingException + .throwWriteSizeExceeded(readCapacityUnitLimiter.waitInterval(estimateReadCapacityUnit)); + } } } @Override - public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, + long writeCapacityUnit, long readCapacityUnit) { assert writeSize != 0 || readSize != 0; reqsLimiter.consume(writeReqs + readReqs); @@ -156,18 +197,30 @@ public class TimeBasedLimiter implements QuotaLimiter { readReqsLimiter.consume(readReqs); readSizeLimiter.consume(readSize); } + if (writeCapacityUnit > 0) { + reqCapacityUnitLimiter.consume(writeCapacityUnit); + writeCapacityUnitLimiter.consume(writeCapacityUnit); + } + if (readCapacityUnit > 0) { + reqCapacityUnitLimiter.consume(readCapacityUnit); + readCapacityUnitLimiter.consume(readCapacityUnit); + } } @Override - public void consumeWrite(final long size) { + public void consumeWrite(final long size, long capacityUnit) { reqSizeLimiter.consume(size); writeSizeLimiter.consume(size); + reqCapacityUnitLimiter.consume(capacityUnit); + writeCapacityUnitLimiter.consume(capacityUnit); } @Override - public void consumeRead(final long size) { + public void consumeRead(final long size, long capacityUnit) { reqSizeLimiter.consume(size); readSizeLimiter.consume(size); + reqCapacityUnitLimiter.consume(capacityUnit); + readCapacityUnitLimiter.consume(capacityUnit); } @Override @@ -189,12 +242,33 @@ public class TimeBasedLimiter implements QuotaLimiter { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("TimeBasedLimiter("); - if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter); - if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter); - if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter); - if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter); - if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter); - if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter); + if (!reqsLimiter.isBypass()) { + builder.append("reqs=" + reqsLimiter); + } + if (!reqSizeLimiter.isBypass()) { + builder.append(" resSize=" + reqSizeLimiter); + } + if (!writeReqsLimiter.isBypass()) { + builder.append(" writeReqs=" + writeReqsLimiter); + } + if (!writeSizeLimiter.isBypass()) { + builder.append(" writeSize=" + writeSizeLimiter); + } + if (!readReqsLimiter.isBypass()) { + builder.append(" readReqs=" + readReqsLimiter); + } + if (!readSizeLimiter.isBypass()) { + builder.append(" readSize=" + readSizeLimiter); + } + if (!reqCapacityUnitLimiter.isBypass()) { + builder.append(" reqCapacityUnit=" + reqCapacityUnitLimiter); + } + if (!writeCapacityUnitLimiter.isBypass()) { + builder.append(" writeCapacityUnit=" + writeCapacityUnitLimiter); + } + if (!readCapacityUnitLimiter.isBypass()) { + builder.append(" readCapacityUnit=" + readCapacityUnitLimiter); + } builder.append(')'); return builder.toString(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index b84dc83e219..03e0aa590ed 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -455,17 +455,22 @@ public class TestQuotaAdmin { @Test public void testSetGetRemoveRPCQuota() throws Exception { + testSetGetRemoveRPCQuota(ThrottleType.REQUEST_SIZE); + testSetGetRemoveRPCQuota(ThrottleType.REQUEST_CAPACITY_UNIT); + } + + private void testSetGetRemoveRPCQuota(ThrottleType throttleType) throws Exception { Admin admin = TEST_UTIL.getAdmin(); final TableName tn = TableName.valueOf("sq_table1"); QuotaSettings settings = - QuotaSettingsFactory.throttleTable(tn, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); + QuotaSettingsFactory.throttleTable(tn, throttleType, 2L, TimeUnit.HOURS); admin.setQuota(settings); // Verify the Quota in the table - verifyRecordPresentInQuotaTable(ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); + verifyRecordPresentInQuotaTable(throttleType, 2L, TimeUnit.HOURS); // Verify we can retrieve it via the QuotaRetriever API - verifyFetchableViaAPI(admin, ThrottleType.REQUEST_SIZE, 2L, TimeUnit.HOURS); + verifyFetchableViaAPI(admin, throttleType, 2L, TimeUnit.HOURS); // Now, remove the quota QuotaSettings removeQuota = QuotaSettingsFactory.unthrottleTable(tn); @@ -584,6 +589,19 @@ public class TestQuotaAdmin { assertTrue(rpcQuota.hasWriteSize()); t = rpcQuota.getWriteSize(); break; + case REQUEST_CAPACITY_UNIT: + assertTrue(rpcQuota.hasReqCapacityUnit()); + t = rpcQuota.getReqCapacityUnit(); + break; + case READ_CAPACITY_UNIT: + assertTrue(rpcQuota.hasReadCapacityUnit()); + t = rpcQuota.getReadCapacityUnit(); + break; + case WRITE_CAPACITY_UNIT: + assertTrue(rpcQuota.hasWriteCapacityUnit()); + t = rpcQuota.getWriteCapacityUnit(); + break; + default: } assertEquals(t.getSoftLimit(), limit); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 0cbc445889b..73b253c9b0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -224,7 +224,7 @@ public class TestQuotaState { assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { - limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -242,7 +242,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1, 0, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -252,11 +252,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1, 0, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0); } catch (RpcThrottlingException e) { fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1, 0, 0); + limiter.grabQuota(1, 1, 0, 0, 1, 0); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java index 59ba3222d36..e506a083ff5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaThrottle.java @@ -509,13 +509,67 @@ public class TestQuotaThrottle { assertEquals(30, doGets(30, tables[1])); } + @Test + public void testTableWriteCapacityUnitThrottle() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + + // Add 6CU/min limit + admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], + ThrottleType.WRITE_CAPACITY_UNIT, 6, TimeUnit.MINUTES)); + triggerTableCacheRefresh(false, TABLE_NAMES[0]); + + // should execute at max 6 capacity units because each put size is 1 capacity unit + assertEquals(6, doPuts(20, 10, tables[0])); + + // wait a minute and you should execute at max 3 capacity units because each put size is 2 + // capacity unit + waitMinuteQuota(); + assertEquals(3, doPuts(20, 1025, tables[0])); + + admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0])); + triggerTableCacheRefresh(true, TABLE_NAMES[0]); + } + + @Test + public void testTableReadCapacityUnitThrottle() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + + // Add 6CU/min limit + admin.setQuota(QuotaSettingsFactory.throttleTable(TABLE_NAMES[0], + ThrottleType.READ_CAPACITY_UNIT, 6, TimeUnit.MINUTES)); + triggerTableCacheRefresh(false, TABLE_NAMES[0]); + + assertEquals(20, doPuts(20, 10, tables[0])); + // should execute at max 6 capacity units because each get size is 1 capacity unit + assertEquals(6, doGets(20, tables[0])); + + assertEquals(20, doPuts(20, 2015, tables[0])); + // wait a minute and you should execute at max 3 capacity units because each get size is 2 + // capacity unit on tables[0] + waitMinuteQuota(); + assertEquals(3, doGets(20, tables[0])); + + admin.setQuota(QuotaSettingsFactory.unthrottleTable(TABLE_NAMES[0])); + triggerTableCacheRefresh(true, TABLE_NAMES[0]); + } + private int doPuts(int maxOps, final Table... tables) throws Exception { + return doPuts(maxOps, -1, tables); + } + + private int doPuts(int maxOps, int valueSize, final Table... tables) throws Exception { int count = 0; try { while (count < maxOps) { Put put = new Put(Bytes.toBytes("row-" + count)); - put.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("data-" + count)); - for (final Table table: tables) { + byte[] value; + if (valueSize < 0) { + value = Bytes.toBytes("data-" + count); + } else { + value = generateValue(valueSize); + } + put.addColumn(FAMILY, QUALIFIER, value); + for (final Table table : tables) { table.put(put); } count += tables.length; @@ -526,6 +580,14 @@ public class TestQuotaThrottle { return count; } + private byte[] generateValue(int valueSize) { + byte[] bytes = new byte[valueSize]; + for (int i = 0; i < valueSize; i++) { + bytes[i] = 'a'; + } + return bytes; + } + private long doGets(int maxOps, final Table... tables) throws Exception { int count = 0; try { diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb index 054b57a6681..ea58dae86b2 100644 --- a/hbase-shell/src/main/ruby/hbase/quotas.rb +++ b/hbase-shell/src/main/ruby/hbase/quotas.rb @@ -263,11 +263,14 @@ module Hbase def _parse_limit(str_limit, type_cls, type) str_limit = str_limit.downcase - match = /(\d+)(req|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit) + match = /(\d+)(req|cu|[bkmgtp])\/(sec|min|hour|day)/.match(str_limit) if match if match[2] == 'req' limit = match[1].to_i type = type_cls.valueOf(type + '_NUMBER') + elsif match[2] == 'cu' + limit = match[1].to_i + type = type_cls.valueOf(type + '_CAPACITY_UNIT') else limit = _size_from_str(match[1].to_i, match[2]) type = type_cls.valueOf(type + '_SIZE') diff --git a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb index ed593b65d11..3a5c136fa5b 100644 --- a/hbase-shell/src/main/ruby/shell/commands/set_quota.rb +++ b/hbase-shell/src/main/ruby/shell/commands/set_quota.rb @@ -26,11 +26,12 @@ Set a quota for a user, table, or namespace. Syntax : set_quota TYPE => , TYPE => THROTTLE -User can either set quota on read, write or on both the requests together(i.e., read+write) +User can either set quota on read, write or on both the requests together(i.e., read+write). The read, write, or read+write(default throttle type) request limit can be expressed using -the form 100req/sec, 100req/min and the read, write, read+write(default throttle type) limit +the form 100req/sec, 100req/min; the read, write, read+write(default throttle type) limit can be expressed using the form 100k/sec, 100M/min with (B, K, M, G, T, P) as valid size unit -and (sec, min, hour, day) as valid time unit. +; the read, write, read+write(default throttle type) limit can be expressed using the form +100CU/sec as capacity unit. The valid time units are (sec, min, hour, day). Currently the throttle limit is per machine - a limit of 100req/min means that each machine can execute 100req/min. @@ -42,6 +43,9 @@ For example: hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10M/sec' hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10M/sec' + hbase> set_quota TYPE => THROTTLE, USER => 'u1', LIMIT => '10CU/sec' + hbase> set_quota TYPE => THROTTLE, THROTTLE_TYPE => WRITE, USER => 'u1', LIMIT => '10CU/sec' + hbase> set_quota TYPE => THROTTLE, USER => 'u1', TABLE => 't2', LIMIT => '5K/min' hbase> set_quota TYPE => THROTTLE, USER => 'u1', NAMESPACE => 'ns2', LIMIT => NONE diff --git a/hbase-shell/src/test/ruby/hbase/quotas_test.rb b/hbase-shell/src/test/ruby/hbase/quotas_test.rb index 3fb00c8a541..be6b238a525 100644 --- a/hbase-shell/src/test/ruby/hbase/quotas_test.rb +++ b/hbase-shell/src/test/ruby/hbase/quotas_test.rb @@ -136,5 +136,32 @@ module Hbase assert(output.include? snapshot1) assert(output.include? snapshot2) end + + define_test 'can set and remove user CU quota' do + command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => '1CU/sec') + output = capture_stdout{ command(:list_quotas) } + assert(output.include?('USER => user1')) + assert(output.include?('TYPE => THROTTLE')) + assert(output.include?('THROTTLE_TYPE => REQUEST_CAPACITY_UNIT')) + assert(output.include?('LIMIT => 1CU/sec')) + + command(:set_quota, TYPE => THROTTLE, USER => 'user1', LIMIT => NONE) + output = capture_stdout{ command(:list_quotas) } + assert(output.include?('0 row(s)')) + end + + define_test 'can set and remove table CU quota' do + command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, + THROTTLE_TYPE => WRITE, LIMIT => '2CU/min') + output = capture_stdout{ command(:list_quotas) } + assert(output.include?('TABLE => hbase_shell_quota_tests_table')) + assert(output.include?('TYPE => THROTTLE')) + assert(output.include?('THROTTLE_TYPE => WRITE_CAPACITY_UNIT')) + assert(output.include?('LIMIT => 2CU/min')) + + command(:set_quota, TYPE => THROTTLE, TABLE => @test_name, LIMIT => NONE) + output = capture_stdout{ command(:list_quotas) } + assert(output.include?('0 row(s)')) + end end end