HBASE-19924 hbase rpc throttling does not work for multi() with request count rater.

This commit is contained in:
Huaxiang Sun 2018-04-20 16:54:03 -07:00
parent 05f8e94191
commit 298ce96246
5 changed files with 66 additions and 31 deletions

View File

@ -69,13 +69,13 @@ public class DefaultOperationQuota implements OperationQuota {
for (final QuotaLimiter limiter: limiters) { for (final QuotaLimiter limiter: limiters) {
if (limiter.isBypass()) continue; if (limiter.isBypass()) continue;
limiter.checkQuota(writeConsumed, readConsumed); limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable()); writeAvailable = Math.min(writeAvailable, limiter.getWriteAvailable());
} }
for (final QuotaLimiter limiter: limiters) { for (final QuotaLimiter limiter: limiters) {
limiter.grabQuota(writeConsumed, readConsumed); limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed);
} }
} }

View File

@ -35,13 +35,13 @@ class NoopQuotaLimiter implements QuotaLimiter {
} }
@Override @Override
public void checkQuota(long estimateWriteSize, long estimateReadSize) public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
throws RpcThrottlingException { long estimateReadSize) throws RpcThrottlingException {
// no-op // no-op
} }
@Override @Override
public void grabQuota(long writeSize, long readSize) { public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
// no-op // no-op
} }

View File

@ -31,11 +31,13 @@ public interface QuotaLimiter {
/** /**
* Checks if it is possible to execute the specified operation. * Checks if it is possible to execute the specified operation.
* *
* @param writeReqs the write requests that will be checked against the available quota
* @param estimateWriteSize the write size that will be checked against the available quota * @param estimateWriteSize the write size that will be checked against the available quota
* @param readReqs the read requests that will be checked against the available quota
* @param estimateReadSize the read size that will be checked against the available quota * @param estimateReadSize the read size that will be checked against the available quota
* @throws RpcThrottlingException thrown if not enough avialable resources to perform operation. * @throws RpcThrottlingException thrown if not enough available resources to perform operation.
*/ */
void checkQuota(long estimateWriteSize, long estimateReadSize) void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize)
throws RpcThrottlingException; throws RpcThrottlingException;
/** /**
@ -43,10 +45,12 @@ public interface QuotaLimiter {
* At this point the write and read amount will be an estimate, * At this point the write and read amount will be an estimate,
* that will be later adjusted with a consumeWrite()/consumeRead() call. * that will be later adjusted with a consumeWrite()/consumeRead() call.
* *
* @param writeReqs the write requests that will be removed from the current quota
* @param writeSize the write size that will be removed from the current quota * @param writeSize the write size that will be removed from the current quota
* @param readReqs the read requests that will be removed from the current quota
* @param readSize the read size that will be removed from the current quota * @param readSize the read size that will be removed from the current quota
*/ */
void grabQuota(long writeSize, long readSize); void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize);
/** /**
* Removes or add back some write amount to the quota. * Removes or add back some write amount to the quota.

View File

@ -110,47 +110,50 @@ public class TimeBasedLimiter implements QuotaLimiter {
} }
@Override @Override
public void checkQuota(long writeSize, long readSize) throws RpcThrottlingException { public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs,
if (!reqsLimiter.canExecute()) { long estimateReadSize) throws RpcThrottlingException {
if (!reqsLimiter.canExecute(writeReqs + readReqs)) {
RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); RpcThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
} }
if (!reqSizeLimiter.canExecute(writeSize + readSize)) { if (!reqSizeLimiter.canExecute(estimateWriteSize + estimateReadSize)) {
RpcThrottlingException.throwRequestSizeExceeded(reqSizeLimiter RpcThrottlingException.throwRequestSizeExceeded(
.waitInterval(writeSize + readSize)); reqSizeLimiter.waitInterval(estimateWriteSize + estimateReadSize));
} }
if (writeSize > 0) { if (estimateWriteSize > 0) {
if (!writeReqsLimiter.canExecute()) { if (!writeReqsLimiter.canExecute(writeReqs)) {
RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); RpcThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
} }
if (!writeSizeLimiter.canExecute(writeSize)) { if (!writeSizeLimiter.canExecute(estimateWriteSize)) {
RpcThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); RpcThrottlingException.throwWriteSizeExceeded(
writeSizeLimiter.waitInterval(estimateWriteSize));
} }
} }
if (readSize > 0) { if (estimateReadSize > 0) {
if (!readReqsLimiter.canExecute()) { if (!readReqsLimiter.canExecute(readReqs)) {
RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); RpcThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
} }
if (!readSizeLimiter.canExecute(readSize)) { if (!readSizeLimiter.canExecute(estimateReadSize)) {
RpcThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); RpcThrottlingException.throwReadSizeExceeded(
readSizeLimiter.waitInterval(estimateReadSize));
} }
} }
} }
@Override @Override
public void grabQuota(long writeSize, long readSize) { public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize) {
assert writeSize != 0 || readSize != 0; assert writeSize != 0 || readSize != 0;
reqsLimiter.consume(1); reqsLimiter.consume(writeReqs + readReqs);
reqSizeLimiter.consume(writeSize + readSize); reqSizeLimiter.consume(writeSize + readSize);
if (writeSize > 0) { if (writeSize > 0) {
writeReqsLimiter.consume(1); writeReqsLimiter.consume(writeReqs);
writeSizeLimiter.consume(writeSize); writeSizeLimiter.consume(writeSize);
} }
if (readSize > 0) { if (readSize > 0) {
readReqsLimiter.consume(1); readReqsLimiter.consume(readReqs);
readSizeLimiter.consume(readSize); readSizeLimiter.consume(readSize);
} }
} }

View File

@ -203,6 +203,34 @@ public class TestQuotaState {
assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME)); assertNoopLimiter(quotaInfo.getTableLimiter(UNKNOWN_TABLE_NAME));
} }
@Test(timeout = 60000)
public void testTableThrottleWithBatch() {
final TableName TABLE_A = TableName.valueOf("TableA");
final int TABLE_A_THROTTLE_1 = 3;
final long LAST_UPDATE_1 = 10;
UserQuotaState quotaInfo = new UserQuotaState();
assertEquals(0, quotaInfo.getLastUpdate());
assertTrue(quotaInfo.isBypass());
// Add A table limiters
UserQuotaState otherQuotaState = new UserQuotaState(LAST_UPDATE_1);
otherQuotaState.setQuotas(TABLE_A, buildReqNumThrottle(TABLE_A_THROTTLE_1));
assertEquals(LAST_UPDATE_1, otherQuotaState.getLastUpdate());
assertFalse(otherQuotaState.isBypass());
quotaInfo.update(otherQuotaState);
assertEquals(LAST_UPDATE_1, quotaInfo.getLastUpdate());
assertFalse(quotaInfo.isBypass());
QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A);
try {
limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0);
fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) {
// expected
}
}
private Quotas buildReqNumThrottle(final long limit) { private Quotas buildReqNumThrottle(final long limit) {
return Quotas.newBuilder() return Quotas.newBuilder()
.setThrottle(Throttle.newBuilder() .setThrottle(Throttle.newBuilder()
@ -214,8 +242,8 @@ public class TestQuotaState {
private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) {
assertNoThrottleException(limiter, availReqs); assertNoThrottleException(limiter, availReqs);
try { try {
limiter.checkQuota(1, 1); limiter.checkQuota(1, 1, 0, 0);
fail("Should have thrown ThrottlingException"); fail("Should have thrown RpcThrottlingException");
} catch (RpcThrottlingException e) { } catch (RpcThrottlingException e) {
// expected // expected
} }
@ -224,11 +252,11 @@ public class TestQuotaState {
private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) {
for (int i = 0; i < availReqs; ++i) { for (int i = 0; i < availReqs; ++i) {
try { try {
limiter.checkQuota(1, 1); limiter.checkQuota(1, 1, 0, 0);
} catch (RpcThrottlingException e) { } catch (RpcThrottlingException e) {
fail("Unexpected ThrottlingException after " + i + " requests. limit=" + availReqs); fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs);
} }
limiter.grabQuota(1, 1); limiter.grabQuota(1, 1, 0, 0);
} }
} }