diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 80b39a86da3..1265a4223a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -69,13 +69,13 @@ public class DefaultOperationQuota implements OperationQuota { for (final QuotaLimiter limiter: limiters) { if (limiter.isBypass()) continue; - limiter.checkQuota(writeConsumed, readConsumed); + limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); } for (final QuotaLimiter limiter: limiters) { - limiter.grabQuota(writeConsumed, readConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index acfdc5285a2..3cca9559c13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -35,13 +35,13 @@ class NoopQuotaLimiter implements QuotaLimiter { } @Override - public void checkQuota(long estimateWriteSize, long estimateReadSize) - throws RpcThrottlingException { + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, + long estimateReadSize) throws RpcThrottlingException { // no-op } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 1144aec1a61..7cb29b31547 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -31,22 +31,26 @@ public interface QuotaLimiter { /** * Checks if it is possible to execute the specified operation. * + * @param writeReqs the write requests that will be checked against the available quota * @param estimateWriteSize the write size that will be checked against the available quota + * @param readReqs the read requests that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota - * @throws RpcThrottlingException thrown if not enough avialable resources to perform operation. + * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ - void checkQuota(long estimateWriteSize, long estimateReadSize) - throws RpcThrottlingException; + void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize) + throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. * At this point the write and read amount will be an estimate, * that will be later adjusted with a consumeWrite()/consumeRead() call. * + * @param writeReqs the write requests that will be removed from the current quota * @param writeSize the write size that will be removed from the current quota + * @param readReqs the read requests that will be removed from the current quota * @param readSize the read size that will be removed from the current quota */ - void grabQuota(long writeSize, long readSize); + void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize); /** * Removes or add back some write amount to the quota. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 12bee809277..02dffcf2856 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -110,47 +110,50 @@ public class TimeBasedLimiter implements QuotaLimiter { } @Override - public void checkQuota(long writeSize, long readSize) throws RpcThrottlingException { - if (!reqsLimiter.canExecute()) { + public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, + long estimateReadSize) throws RpcThrottlingException { + if (!reqsLimiter.canExecute(writeReqs + readReqs)) { RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } - if (!reqSizeLimiter.canExecute(writeSize + readSize)) { - RpcThrottlingException.throwRequestSizeExceeded(reqSizeLimiter - .waitInterval(writeSize + readSize)); + if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) { + RpcThrottlingException.throwRequestSizeExceeded( + reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize)); } - if (writeSize > 0) { - if (!writeReqsLimiter.canExecute()) { + if (estimateWriteSize > 0) { + if (!writeReqsLimiter.canExecute(writeReqs)) { RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); } - if (!writeSizeLimiter.canExecute(writeSize)) { - RpcThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); + if (!writeSizeLimiter.canExecute(estimateWriteSize)) { + RpcThrottlingException.throwWriteSizeExceeded( + writeSizeLimiter.waitInterval(estimateWriteSize)); } } - if (readSize > 0) { - if (!readReqsLimiter.canExecute()) { + if (estimateReadSize > 0) { + if (!readReqsLimiter.canExecute(readReqs)) { RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); } - if (!readSizeLimiter.canExecute(readSize)) { - RpcThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); + if (!readSizeLimiter.canExecute(estimateReadSize)) { + RpcThrottlingException.throwReadSizeExceeded( + readSizeLimiter.waitInterval(estimateReadSize)); } } } @Override - public void grabQuota(long writeSize, long readSize) { + public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) { assert writeSize != 0 || readSize != 0; - reqsLimiter.consume(1); + reqsLimiter.consume(writeReqs + readReqs); reqSizeLimiter.consume(writeSize + readSize); if (writeSize > 0) { - writeReqsLimiter.consume(1); + writeReqsLimiter.consume(writeReqs); writeSizeLimiter.consume(writeSize); } if (readSize > 0) { - readReqsLimiter.consume(1); + readReqsLimiter.consume(readReqs); readSizeLimiter.consume(readSize); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index 8a77e0e291b..0cbc445889b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -203,6 +203,34 @@ public class TestQuotaState { assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); } + @Test(timeout = 60000) + public void testTableThrottleWithBatch() { + final TableName TABLE_A = TableName.valueOf("TableA"); + final int TABLE_A_THROTTLE_1 = 3; + final long LAST_UPDATE_1 = 10; + + UserQuotaState quotaInfo = new UserQuotaState(); + assertEquals(0, quotaInfo.getLastUpdate()); + assertTrue(quotaInfo.isBypass()); + + // Add A table limiters + UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1); + otherQuotaState.setQuotas(TABLE_A, buildReqNumThrottle(TABLE_A_THROTTLE_1)); + assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate()); + assertFalse(otherQuotaState.isBypass()); + + quotaInfo.update(otherQuotaState); + assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate()); + assertFalse(quotaInfo.isBypass()); + QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); + try { + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0); + fail("Should have thrown RpcThrottlingException"); + } catch (RpcThrottlingException e) { + // expected + } + } + private Quotas buildReqNumThrottle(final long limit) { return Quotas.newBuilder() .setThrottle(Throttle.newBuilder() @@ -214,8 +242,8 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1); - fail("Should have thrown ThrottlingException"); + limiter.checkQuota(1, 1, 0, 0); + fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected } @@ -224,11 +252,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1); + limiter.checkQuota(1, 1, 0, 0); } catch (RpcThrottlingException e) { - fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs); + fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1); + limiter.grabQuota(1, 1, 0, 0); } }