From 66038b8c1a4ad775fdb16c7a70ceb3d86ff149bb Mon Sep 17 00:00:00 2001 From: Huaxiang Sun Date: Mon, 10 Oct 2016 14:12:03 -0700 Subject: [PATCH] HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval() Signed-off-by: Matteo Bertozzi --- .../quotas/AverageIntervalRateLimiter.java | 20 ++- .../hbase/quotas/DefaultOperationQuota.java | 47 ++----- .../hbase/quotas/NoopOperationQuota.java | 5 - .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 9 -- .../hadoop/hbase/quotas/OperationQuota.java | 59 +------- .../hadoop/hbase/quotas/QuotaLimiter.java | 9 -- .../hadoop/hbase/quotas/RateLimiter.java | 38 ++++- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 13 -- .../hadoop/hbase/quotas/TestRateLimiter.java | 130 ++++++++++++++++++ 9 files changed, 196 insertions(+), 134 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java index 75e6aea2744..9320d7c0256 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java @@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter { return limit; } - long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis(); + long timeInterval = now - nextRefillTime; + long delta = 0; + long timeUnitInMillis = super.getTimeUnitInMillis(); + if (timeInterval >= timeUnitInMillis) { + delta = limit; + } else if (timeInterval > 0) { + double r = ((double)timeInterval / (double)timeUnitInMillis) * limit; + delta = (long)r; + } + if (delta > 0) { this.nextRefillTime = now; - return Math.min(limit, delta); } - return 0; + + return delta; } @Override @@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter { if (nextRefillTime == -1) { return 0; } - long timeUnitInMillis = super.getTimeUnitInMillis(); - return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit); + + double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit; + return (long)r; } // This method is for strictly testing purpose only diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 654e8fabd24..6caac7440c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota { private long readAvailable = 0; private long writeConsumed = 0; private long readConsumed = 0; - - private AvgOperationSize avgOpSize = new AvgOperationSize(); + private final long[] operationSize; public DefaultOperationQuota(final QuotaLimiter... limiters) { this(Arrays.asList(limiters)); @@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota { */ public DefaultOperationQuota(final List limiters) { this.limiters = limiters; + int size = OperationType.values().length; + operationSize = new long[size]; + + for (int i = 0; i < size; ++i) { + operationSize[i] = 0; + } } @Override @@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota { @Override public void close() { - // Calculate and set the average size of get, scan and mutate for the current operation - long getSize = avgOpSize.getAvgOperationSize(OperationType.GET); - long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN); - long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE); - for (final QuotaLimiter limiter : limiters) { - limiter.addOperationSize(OperationType.GET, getSize); - limiter.addOperationSize(OperationType.SCAN, scanSize); - limiter.addOperationSize(OperationType.MUTATE, mutationSize); - } - // Adjust the quota consumed for the specified operation - long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed; - long readDiff = - (avgOpSize.getOperationSize(OperationType.GET) + avgOpSize - .getOperationSize(OperationType.SCAN)) - readConsumed; - for (final QuotaLimiter limiter : limiters) { + long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed; + long readDiff = operationSize[OperationType.GET.ordinal()] + + operationSize[OperationType.SCAN.ordinal()] - readConsumed; + + for (final QuotaLimiter limiter: limiters) { if (writeDiff != 0) limiter.consumeWrite(writeDiff); if (readDiff != 0) limiter.consumeRead(readDiff); } @@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota { @Override public void addGetResult(final Result result) { - avgOpSize.addGetResult(result); + operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result); } @Override public void addScanResult(final List results) { - avgOpSize.addScanResult(results); + operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results); } @Override public void addMutation(final Mutation mutation) { - avgOpSize.addMutation(mutation); - } - - @Override - public long getAvgOperationSize(OperationType type) { - return avgOpSize.getAvgOperationSize(type); + operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation); } private long estimateConsume(final OperationType type, int numReqs, long avgSize) { if (numReqs > 0) { - for (final QuotaLimiter limiter : limiters) { - long size = limiter.getAvgOperationSize(type); - if (size > 0) { - avgSize = size; - break; - } - } return avgSize * numReqs; } return 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 2463ef76d24..9d37141a224 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota { public long getWriteAvailable() { return Long.MAX_VALUE; } - - @Override - public long getAvgOperationSize(OperationType type) { - return -1; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 699fd1a5e95..e72f3d23e80 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -62,15 +62,6 @@ final class NoopQuotaLimiter implements QuotaLimiter { throw new UnsupportedOperationException(); } - @Override - public void addOperationSize(OperationType type, long size) { - } - - @Override - public long getAvgOperationSize(OperationType type) { - return -1; - } - @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index 6010c131aa4..ee38256ef35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -29,58 +29,10 @@ public interface OperationQuota { } /** - * Keeps track of the average data size of operations like get, scan, mutate - */ - public class AvgOperationSize { - private final long[] sizeSum; - private final long[] count; - - public AvgOperationSize() { - int size = OperationType.values().length; - sizeSum = new long[size]; - count = new long[size]; - for (int i = 0; i < size; ++i) { - sizeSum[i] = 0; - count[i] = 0; - } - } - - public void addOperationSize(OperationType type, long size) { - if (size > 0) { - int index = type.ordinal(); - sizeSum[index] += size; - count[index]++; - } - } - - public long getAvgOperationSize(OperationType type) { - int index = type.ordinal(); - return count[index] > 0 ? sizeSum[index] / count[index] : 0; - } - - public long getOperationSize(OperationType type) { - return sizeSum[type.ordinal()]; - } - - public void addGetResult(final Result result) { - long size = QuotaUtil.calculateResultSize(result); - addOperationSize(OperationType.GET, size); - } - - public void addScanResult(final List results) { - long size = QuotaUtil.calculateResultSize(results); - addOperationSize(OperationType.SCAN, size); - } - - public void addMutation(final Mutation mutation) { - long size = QuotaUtil.calculateMutationSize(mutation); - addOperationSize(OperationType.MUTATE, size); - } - } - - /** - * Checks if it is possible to execute the specified operation. The quota will be estimated based - * on the number of operations to perform and the average size accumulated during time. + * Checks if it is possible to execute the specified operation. + * The quota will be estimated based on the number of operations to perform + * and the average size accumulated during time. + * * @param numWrites number of write operation that will be performed * @param numReads number of small-read operation that will be performed * @param numScans number of long-read operation that will be performed @@ -114,7 +66,4 @@ public interface OperationQuota { /** @return the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); - - /** @return the average data size of the specified operation */ - long getAvgOperationSize(OperationType type); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index ffacbc0b09c..a27211c5fa6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -68,13 +68,4 @@ public interface QuotaLimiter { /** @return the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); - - /** - * Add the average size of the specified operation type. - * The average will be used as estimate for the next operations. - */ - void addOperationSize(OperationType type, long size); - - /** @return the average data size of the specified operation */ - long getAvgOperationSize(OperationType type); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index 3b407bd5dfe..356afd315e3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -111,7 +111,15 @@ public abstract class RateLimiter { public synchronized void update(final RateLimiter other) { this.tunit = other.tunit; if (this.limit < other.limit) { - this.avail += (other.limit - this.limit); + // If avail is capped to this.limit, it will never overflow, + // otherwise, avail may overflow, just be careful here. + long diff = other.limit - this.limit; + if (this.avail <= Long.MAX_VALUE - diff) { + this.avail += diff; + this.avail = Math.min(this.avail, other.limit); + } else { + this.avail = other.limit; + } } this.limit = other.limit; } @@ -142,10 +150,14 @@ public abstract class RateLimiter { /** * Are there enough available resources to allow execution? - * @param amount the number of required resources + * @param amount the number of required resources, a non-negative number * @return true if there are enough available resources, otherwise false */ public synchronized boolean canExecute(final long amount) { + if (isBypass()) { + return true; + } + long refillAmount = refill(limit); if (refillAmount == 0 && avail < amount) { return false; @@ -170,13 +182,27 @@ public abstract class RateLimiter { } /** - * consume amount available units. + * consume amount available units, amount could be a negative number * @param amount the number of units to consume */ public synchronized void consume(final long amount) { - this.avail -= amount; - if (this.avail < 0) { - this.avail = 0; + + if (isBypass()) { + return; + } + + if (amount >= 0 ) { + this.avail -= amount; + if (this.avail < 0) { + this.avail = 0; + } + } else { + if (this.avail <= Long.MAX_VALUE + amount) { + this.avail -= amount; + this.avail = Math.min(this.avail, this.limit); + } else { + this.avail = this.limit; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index beb4147643a..156387862bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; -import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize; -import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; /** * Simple time based limiter that checks the quota Throttle @@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter writeSizeLimiter = null; private RateLimiter readReqsLimiter = null; private RateLimiter readSizeLimiter = null; - private AvgOperationSize avgOpSize = new AvgOperationSize(); private TimeBasedLimiter() { if (FixedIntervalRateLimiter.class.getName().equals( @@ -185,16 +182,6 @@ public class TimeBasedLimiter implements QuotaLimiter { return readSizeLimiter.getAvailable(); } - @Override - public void addOperationSize(OperationType type, long size) { - avgOpSize.addOperationSize(type, size); - } - - @Override - public long getAvgOperationSize(OperationType type) { - return avgOpSize.getAvgOperationSize(type); - } - @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 4c5a658092d..1ca66431f33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -277,4 +278,133 @@ public class TestRateLimiter { limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000); assertEquals(60, limiter.refill(limiter.getLimit())); } + + @Test + public void testUnconfiguredLimiters() throws InterruptedException { + + ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(testEdge); + long limit = Long.MAX_VALUE; + + // For unconfigured limiters, it is supposed to use as much as possible + RateLimiter avgLimiter = new AverageIntervalRateLimiter(); + RateLimiter fixLimiter = new FixedIntervalRateLimiter(); + + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); + + assertTrue(avgLimiter.canExecute(limit)); + avgLimiter.consume(limit); + + assertTrue(fixLimiter.canExecute(limit)); + fixLimiter.consume(limit); + + // Make sure that available is Long.MAX_VALUE + assertTrue(limit == avgLimiter.getAvailable()); + assertTrue(limit == fixLimiter.getAvailable()); + + // after 100 millseconds, it should be able to execute limit as well + testEdge.incValue(100); + + assertTrue(avgLimiter.canExecute(limit)); + avgLimiter.consume(limit); + + assertTrue(fixLimiter.canExecute(limit)); + fixLimiter.consume(limit); + + // Make sure that available is Long.MAX_VALUE + assertTrue(limit == avgLimiter.getAvailable()); + assertTrue(limit == fixLimiter.getAvailable()); + + EnvironmentEdgeManager.reset(); + } + + @Test + public void testExtremeLimiters() throws InterruptedException { + + ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge(); + EnvironmentEdgeManager.injectEdge(testEdge); + long limit = Long.MAX_VALUE - 1; + + RateLimiter avgLimiter = new AverageIntervalRateLimiter(); + avgLimiter.set(limit, TimeUnit.SECONDS); + RateLimiter fixLimiter = new FixedIntervalRateLimiter(); + fixLimiter.set(limit, TimeUnit.SECONDS); + + assertEquals(limit, avgLimiter.getAvailable()); + assertEquals(limit, fixLimiter.getAvailable()); + + assertTrue(avgLimiter.canExecute(limit / 2)); + avgLimiter.consume(limit / 2); + + assertTrue(fixLimiter.canExecute(limit / 2)); + fixLimiter.consume(limit / 2); + + // Make sure that available is whatever left + assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable()); + assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + + // after 100 millseconds, both should not be able to execute the limit + testEdge.incValue(100); + + assertFalse(avgLimiter.canExecute(limit)); + assertFalse(fixLimiter.canExecute(limit)); + + // after 500 millseconds, average interval limiter should be able to execute the limit + testEdge.incValue(500); + assertTrue(avgLimiter.canExecute(limit)); + assertFalse(fixLimiter.canExecute(limit)); + + // Make sure that available is correct + assertTrue(limit == avgLimiter.getAvailable()); + assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable()); + + // after 500 millseconds, both should be able to execute + testEdge.incValue(500); + assertTrue(avgLimiter.canExecute(limit)); + assertTrue(fixLimiter.canExecute(limit)); + + // Make sure that available is Long.MAX_VALUE + assertTrue(limit == avgLimiter.getAvailable()); + assertTrue(limit == fixLimiter.getAvailable()); + + EnvironmentEdgeManager.reset(); + } + + /* + * This test case is tricky. Basically, it simulates the following events: + * Thread-1 Thread-2 + * t0: canExecute(100) and consume(100) + * t1: canExecute(100), avail may be increased by 80 + * t2: consume(-80) as actual size is 20 + * It will check if consume(-80) can handle overflow correctly. + */ + @Test + public void testLimiterCompensationOverflow() throws InterruptedException { + + long limit = Long.MAX_VALUE - 1; + long guessNumber = 100; + + // For unconfigured limiters, it is supposed to use as much as possible + RateLimiter avgLimiter = new AverageIntervalRateLimiter(); + avgLimiter.set(limit, TimeUnit.SECONDS); + + assertEquals(limit, avgLimiter.getAvailable()); + + // The initial guess is that 100 bytes. + assertTrue(avgLimiter.canExecute(guessNumber)); + avgLimiter.consume(guessNumber); + + // Make sure that available is whatever left + assertTrue((limit - guessNumber) == avgLimiter.getAvailable()); + + // Manually set avil to simulate that another thread call canExecute(). + // It is simulated by consume(). + avgLimiter.consume(-80); + assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable()); + + // Now thread1 compensates 80 + avgLimiter.consume(-80); + assertTrue(limit == avgLimiter.getAvailable()); + } }