HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
This commit is contained in:
Huaxiang Sun 2016-10-10 14:06:28 -07:00 committed by Matteo Bertozzi
parent c930bc92f4
commit 341f049e77
9 changed files with 191 additions and 132 deletions

View File

@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
return limit;
}
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
long timeInterval = now - nextRefillTime;
long delta = 0;
long timeUnitInMillis = super.getTimeUnitInMillis();
if (timeInterval >= timeUnitInMillis) {
delta = limit;
} else if (timeInterval > 0) {
double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
delta = (long)r;
}
if (delta > 0) {
this.nextRefillTime = now;
return Math.min(limit, delta);
}
return 0;
return delta;
}
@Override
@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
if (nextRefillTime == -1) {
return 0;
}
long timeUnitInMillis = super.getTimeUnitInMillis();
return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
return (long)r;
}
// This method is for strictly testing purpose only

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@ -40,8 +38,7 @@ public class DefaultOperationQuota implements OperationQuota {
private long readAvailable = 0;
private long writeConsumed = 0;
private long readConsumed = 0;
private AvgOperationSize avgOpSize = new AvgOperationSize();
private final long[] operationSize;
public DefaultOperationQuota(final QuotaLimiter... limiters) {
this(Arrays.asList(limiters));
@ -52,6 +49,12 @@ public class DefaultOperationQuota implements OperationQuota {
*/
public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
this.limiters = limiters;
int size = OperationType.values().length;
operationSize = new long[size];
for (int i = 0; i < size; ++i) {
operationSize[i] = 0;
}
}
@Override
@ -78,20 +81,11 @@ public class DefaultOperationQuota implements OperationQuota {
@Override
public void close() {
// Calculate and set the average size of get, scan and mutate for the current operation
long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
for (final QuotaLimiter limiter: limiters) {
limiter.addOperationSize(OperationType.GET, getSize);
limiter.addOperationSize(OperationType.SCAN, scanSize);
limiter.addOperationSize(OperationType.MUTATE, mutationSize);
}
// Adjust the quota consumed for the specified operation
long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed;
long readDiff = (avgOpSize.getOperationSize(OperationType.GET) +
avgOpSize.getOperationSize(OperationType.SCAN)) - readConsumed;
long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
long readDiff = operationSize[OperationType.GET.ordinal()] +
operationSize[OperationType.SCAN.ordinal()] - readConsumed;
for (final QuotaLimiter limiter: limiters) {
if (writeDiff != 0) limiter.consumeWrite(writeDiff);
if (readDiff != 0) limiter.consumeRead(readDiff);
@ -110,33 +104,21 @@ public class DefaultOperationQuota implements OperationQuota {
@Override
public void addGetResult(final Result result) {
avgOpSize.addGetResult(result);
operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
}
@Override
public void addScanResult(final List<Result> results) {
avgOpSize.addScanResult(results);
operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
}
@Override
public void addMutation(final Mutation mutation) {
avgOpSize.addMutation(mutation);
}
@Override
public long getAvgOperationSize(OperationType type) {
return avgOpSize.getAvgOperationSize(type);
operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
}
private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
if (numReqs > 0) {
for (final QuotaLimiter limiter: limiters) {
long size = limiter.getAvgOperationSize(type);
if (size > 0) {
avgSize = size;
break;
}
}
return avgSize * numReqs;
}
return 0;

View File

@ -76,9 +76,4 @@ class NoopOperationQuota implements OperationQuota {
public long getWriteAvailable() {
return Long.MAX_VALUE;
}
@Override
public long getAvgOperationSize(OperationType type) {
return -1;
}
}

View File

@ -70,15 +70,6 @@ class NoopQuotaLimiter implements QuotaLimiter {
throw new UnsupportedOperationException();
}
@Override
public void addOperationSize(OperationType type, long size) {
}
@Override
public long getAvgOperationSize(OperationType type) {
return -1;
}
@Override
public String toString() {
return "NoopQuotaLimiter";

View File

@ -33,56 +33,6 @@ import org.apache.hadoop.hbase.client.Result;
public interface OperationQuota {
public enum OperationType { MUTATE, GET, SCAN }
/**
* Keeps track of the average data size of operations like get, scan, mutate
*/
public class AvgOperationSize {
private final long[] sizeSum;
private final long[] count;
public AvgOperationSize() {
int size = OperationType.values().length;
sizeSum = new long[size];
count = new long[size];
for (int i = 0; i < size; ++i) {
sizeSum[i] = 0;
count[i] = 0;
}
}
public void addOperationSize(OperationType type, long size) {
if (size > 0) {
int index = type.ordinal();
sizeSum[index] += size;
count[index]++;
}
}
public long getAvgOperationSize(OperationType type) {
int index = type.ordinal();
return count[index] > 0 ? sizeSum[index] / count[index] : 0;
}
public long getOperationSize(OperationType type) {
return sizeSum[type.ordinal()];
}
public void addGetResult(final Result result) {
long size = QuotaUtil.calculateResultSize(result);
addOperationSize(OperationType.GET, size);
}
public void addScanResult(final List<Result> results) {
long size = QuotaUtil.calculateResultSize(results);
addOperationSize(OperationType.SCAN, size);
}
public void addMutation(final Mutation mutation) {
long size = QuotaUtil.calculateMutationSize(mutation);
addOperationSize(OperationType.MUTATE, size);
}
}
/**
* Checks if it is possible to execute the specified operation.
* The quota will be estimated based on the number of operations to perform
@ -122,7 +72,4 @@ public interface OperationQuota {
/** @return the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();
/** @return the average data size of the specified operation */
long getAvgOperationSize(OperationType type);
}

View File

@ -68,13 +68,4 @@ public interface QuotaLimiter {
/** @return the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable();
/**
* Add the average size of the specified operation type.
* The average will be used as estimate for the next operations.
*/
void addOperationSize(OperationType type, long size);
/** @return the average data size of the specified operation */
long getAvgOperationSize(OperationType type);
}

View File

@ -118,7 +118,15 @@ public abstract class RateLimiter {
public synchronized void update(final RateLimiter other) {
this.tunit = other.tunit;
if (this.limit < other.limit) {
this.avail += (other.limit - this.limit);
// If avail is capped to this.limit, it will never overflow,
// otherwise, avail may overflow, just be careful here.
long diff = other.limit - this.limit;
if (this.avail <= Long.MAX_VALUE - diff) {
this.avail += diff;
this.avail = Math.min(this.avail, other.limit);
} else {
this.avail = other.limit;
}
}
this.limit = other.limit;
}
@ -149,10 +157,14 @@ public abstract class RateLimiter {
/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources
* @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false
*/
public synchronized boolean canExecute(final long amount) {
if (isBypass()) {
return true;
}
long refillAmount = refill(limit);
if (refillAmount == 0 && avail < amount) {
return false;
@ -177,13 +189,27 @@ public abstract class RateLimiter {
}
/**
* consume amount available units.
* consume amount available units, amount could be a negative number
* @param amount the number of units to consume
*/
public synchronized void consume(final long amount) {
this.avail -= amount;
if (this.avail < 0) {
this.avail = 0;
if (isBypass()) {
return;
}
if (amount >= 0 ) {
this.avail -= amount;
if (this.avail < 0) {
this.avail = 0;
}
} else {
if (this.avail <= Long.MAX_VALUE + amount) {
this.avail -= amount;
this.avail = Math.min(this.avail, this.limit);
} else {
this.avail = this.limit;
}
}
}

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.TimedQuota;
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/**
* Simple time based limiter that checks the quota Throttle
@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter writeSizeLimiter = null;
private RateLimiter readReqsLimiter = null;
private RateLimiter readSizeLimiter = null;
private AvgOperationSize avgOpSize = new AvgOperationSize();
private TimeBasedLimiter() {
if (FixedIntervalRateLimiter.class.getName().equals(
@ -185,16 +182,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
return readSizeLimiter.getAvailable();
}
@Override
public void addOperationSize(OperationType type, long size) {
avgOpSize.addOperationSize(type, size);
}
@Override
public long getAvgOperationSize(OperationType type) {
return avgOpSize.getAvgOperationSize(type);
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -285,4 +286,133 @@ public class TestRateLimiter {
limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
assertEquals(60, limiter.refill(limiter.getLimit()));
}
@Test
public void testUnconfiguredLimiters() throws InterruptedException {
ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(testEdge);
long limit = Long.MAX_VALUE;
// For unconfigured limiters, it is supposed to use as much as possible
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
RateLimiter fixLimiter = new FixedIntervalRateLimiter();
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
// after 100 millseconds, it should be able to execute limit as well
testEdge.incValue(100);
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
@Test
public void testExtremeLimiters() throws InterruptedException {
ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(testEdge);
long limit = Long.MAX_VALUE - 1;
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
avgLimiter.set(limit, TimeUnit.SECONDS);
RateLimiter fixLimiter = new FixedIntervalRateLimiter();
fixLimiter.set(limit, TimeUnit.SECONDS);
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
assertTrue(avgLimiter.canExecute(limit / 2));
avgLimiter.consume(limit / 2);
assertTrue(fixLimiter.canExecute(limit / 2));
fixLimiter.consume(limit / 2);
// Make sure that available is whatever left
assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
// after 100 millseconds, both should not be able to execute the limit
testEdge.incValue(100);
assertFalse(avgLimiter.canExecute(limit));
assertFalse(fixLimiter.canExecute(limit));
// after 500 millseconds, average interval limiter should be able to execute the limit
testEdge.incValue(500);
assertTrue(avgLimiter.canExecute(limit));
assertFalse(fixLimiter.canExecute(limit));
// Make sure that available is correct
assertTrue(limit == avgLimiter.getAvailable());
assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
// after 500 millseconds, both should be able to execute
testEdge.incValue(500);
assertTrue(avgLimiter.canExecute(limit));
assertTrue(fixLimiter.canExecute(limit));
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
/*
* This test case is tricky. Basically, it simulates the following events:
* Thread-1 Thread-2
* t0: canExecute(100) and consume(100)
* t1: canExecute(100), avail may be increased by 80
* t2: consume(-80) as actual size is 20
* It will check if consume(-80) can handle overflow correctly.
*/
@Test
public void testLimiterCompensationOverflow() throws InterruptedException {
long limit = Long.MAX_VALUE - 1;
long guessNumber = 100;
// For unconfigured limiters, it is supposed to use as much as possible
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
avgLimiter.set(limit, TimeUnit.SECONDS);
assertEquals(limit, avgLimiter.getAvailable());
// The initial guess is that 100 bytes.
assertTrue(avgLimiter.canExecute(guessNumber));
avgLimiter.consume(guessNumber);
// Make sure that available is whatever left
assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
// Manually set avil to simulate that another thread call canExecute().
// It is simulated by consume().
avgLimiter.consume(-80);
assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
// Now thread1 compensates 80
avgLimiter.consume(-80);
assertTrue(limit == avgLimiter.getAvailable());
}
}