HBASE-16699 Overflows in AverageIntervalRateLimiter's refill() and getWaitInterval()

Signed-off-by: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
This commit is contained in:
Huaxiang Sun 2016-10-10 14:12:03 -07:00 committed by Matteo Bertozzi
parent acb1392b15
commit 66038b8c1a
9 changed files with 196 additions and 134 deletions

View File

@ -34,12 +34,21 @@ public class AverageIntervalRateLimiter extends RateLimiter {
return limit; return limit;
} }
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis(); long timeInterval = now - nextRefillTime;
long delta = 0;
long timeUnitInMillis = super.getTimeUnitInMillis();
if (timeInterval >= timeUnitInMillis) {
delta = limit;
} else if (timeInterval > 0) {
double r = ((double)timeInterval / (double)timeUnitInMillis) * limit;
delta = (long)r;
}
if (delta > 0) { if (delta > 0) {
this.nextRefillTime = now; this.nextRefillTime = now;
return Math.min(limit, delta);
} }
return 0;
return delta;
} }
@Override @Override
@ -47,8 +56,9 @@ public class AverageIntervalRateLimiter extends RateLimiter {
if (nextRefillTime == -1) { if (nextRefillTime == -1) {
return 0; return 0;
} }
long timeUnitInMillis = super.getTimeUnitInMillis();
return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit); double r = ((double)(amount - available)) * super.getTimeUnitInMillis() / limit;
return (long)r;
} }
// This method is for strictly testing purpose only // This method is for strictly testing purpose only

View File

@ -31,8 +31,7 @@ public class DefaultOperationQuota implements OperationQuota {
private long readAvailable = 0; private long readAvailable = 0;
private long writeConsumed = 0; private long writeConsumed = 0;
private long readConsumed = 0; private long readConsumed = 0;
private final long[] operationSize;
private AvgOperationSize avgOpSize = new AvgOperationSize();
public DefaultOperationQuota(final QuotaLimiter... limiters) { public DefaultOperationQuota(final QuotaLimiter... limiters) {
this(Arrays.asList(limiters)); this(Arrays.asList(limiters));
@ -43,6 +42,12 @@ public class DefaultOperationQuota implements OperationQuota {
*/ */
public DefaultOperationQuota(final List<QuotaLimiter> limiters) { public DefaultOperationQuota(final List<QuotaLimiter> limiters) {
this.limiters = limiters; this.limiters = limiters;
int size = OperationType.values().length;
operationSize = new long[size];
for (int i = 0; i < size; ++i) {
operationSize[i] = 0;
}
} }
@Override @Override
@ -68,22 +73,12 @@ public class DefaultOperationQuota implements OperationQuota {
@Override @Override
public void close() { public void close() {
// Calculate and set the average size of get, scan and mutate for the current operation
long getSize = avgOpSize.getAvgOperationSize(OperationType.GET);
long scanSize = avgOpSize.getAvgOperationSize(OperationType.SCAN);
long mutationSize = avgOpSize.getAvgOperationSize(OperationType.MUTATE);
for (final QuotaLimiter limiter : limiters) {
limiter.addOperationSize(OperationType.GET, getSize);
limiter.addOperationSize(OperationType.SCAN, scanSize);
limiter.addOperationSize(OperationType.MUTATE, mutationSize);
}
// Adjust the quota consumed for the specified operation // Adjust the quota consumed for the specified operation
long writeDiff = avgOpSize.getOperationSize(OperationType.MUTATE) - writeConsumed; long writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
long readDiff = long readDiff = operationSize[OperationType.GET.ordinal()] +
(avgOpSize.getOperationSize(OperationType.GET) + avgOpSize operationSize[OperationType.SCAN.ordinal()] - readConsumed;
.getOperationSize(OperationType.SCAN)) - readConsumed;
for (final QuotaLimiter limiter : limiters) { for (final QuotaLimiter limiter: limiters) {
if (writeDiff != 0) limiter.consumeWrite(writeDiff); if (writeDiff != 0) limiter.consumeWrite(writeDiff);
if (readDiff != 0) limiter.consumeRead(readDiff); if (readDiff != 0) limiter.consumeRead(readDiff);
} }
@ -101,33 +96,21 @@ public class DefaultOperationQuota implements OperationQuota {
@Override @Override
public void addGetResult(final Result result) { public void addGetResult(final Result result) {
avgOpSize.addGetResult(result); operationSize[OperationType.GET.ordinal()] += QuotaUtil.calculateResultSize(result);
} }
@Override @Override
public void addScanResult(final List<Result> results) { public void addScanResult(final List<Result> results) {
avgOpSize.addScanResult(results); operationSize[OperationType.SCAN.ordinal()] += QuotaUtil.calculateResultSize(results);
} }
@Override @Override
public void addMutation(final Mutation mutation) { public void addMutation(final Mutation mutation) {
avgOpSize.addMutation(mutation); operationSize[OperationType.MUTATE.ordinal()] += QuotaUtil.calculateMutationSize(mutation);
}
@Override
public long getAvgOperationSize(OperationType type) {
return avgOpSize.getAvgOperationSize(type);
} }
private long estimateConsume(final OperationType type, int numReqs, long avgSize) { private long estimateConsume(final OperationType type, int numReqs, long avgSize) {
if (numReqs > 0) { if (numReqs > 0) {
for (final QuotaLimiter limiter : limiters) {
long size = limiter.getAvgOperationSize(type);
if (size > 0) {
avgSize = size;
break;
}
}
return avgSize * numReqs; return avgSize * numReqs;
} }
return 0; return 0;

View File

@ -68,9 +68,4 @@ final class NoopOperationQuota implements OperationQuota {
public long getWriteAvailable() { public long getWriteAvailable() {
return Long.MAX_VALUE; return Long.MAX_VALUE;
} }
@Override
public long getAvgOperationSize(OperationType type) {
return -1;
}
} }

View File

@ -62,15 +62,6 @@ final class NoopQuotaLimiter implements QuotaLimiter {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public void addOperationSize(OperationType type, long size) {
}
@Override
public long getAvgOperationSize(OperationType type) {
return -1;
}
@Override @Override
public String toString() { public String toString() {
return "NoopQuotaLimiter"; return "NoopQuotaLimiter";

View File

@ -29,58 +29,10 @@ public interface OperationQuota {
} }
/** /**
* Keeps track of the average data size of operations like get, scan, mutate * Checks if it is possible to execute the specified operation.
*/ * The quota will be estimated based on the number of operations to perform
public class AvgOperationSize { * and the average size accumulated during time.
private final long[] sizeSum; *
private final long[] count;
public AvgOperationSize() {
int size = OperationType.values().length;
sizeSum = new long[size];
count = new long[size];
for (int i = 0; i < size; ++i) {
sizeSum[i] = 0;
count[i] = 0;
}
}
public void addOperationSize(OperationType type, long size) {
if (size > 0) {
int index = type.ordinal();
sizeSum[index] += size;
count[index]++;
}
}
public long getAvgOperationSize(OperationType type) {
int index = type.ordinal();
return count[index] > 0 ? sizeSum[index] / count[index] : 0;
}
public long getOperationSize(OperationType type) {
return sizeSum[type.ordinal()];
}
public void addGetResult(final Result result) {
long size = QuotaUtil.calculateResultSize(result);
addOperationSize(OperationType.GET, size);
}
public void addScanResult(final List<Result> results) {
long size = QuotaUtil.calculateResultSize(results);
addOperationSize(OperationType.SCAN, size);
}
public void addMutation(final Mutation mutation) {
long size = QuotaUtil.calculateMutationSize(mutation);
addOperationSize(OperationType.MUTATE, size);
}
}
/**
* Checks if it is possible to execute the specified operation. The quota will be estimated based
* on the number of operations to perform and the average size accumulated during time.
* @param numWrites number of write operation that will be performed * @param numWrites number of write operation that will be performed
* @param numReads number of small-read operation that will be performed * @param numReads number of small-read operation that will be performed
* @param numScans number of long-read operation that will be performed * @param numScans number of long-read operation that will be performed
@ -114,7 +66,4 @@ public interface OperationQuota {
/** @return the number of bytes available to write to avoid exceeding the quota */ /** @return the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable(); long getWriteAvailable();
/** @return the average data size of the specified operation */
long getAvgOperationSize(OperationType type);
} }

View File

@ -68,13 +68,4 @@ public interface QuotaLimiter {
/** @return the number of bytes available to write to avoid exceeding the quota */ /** @return the number of bytes available to write to avoid exceeding the quota */
long getWriteAvailable(); long getWriteAvailable();
/**
* Add the average size of the specified operation type.
* The average will be used as estimate for the next operations.
*/
void addOperationSize(OperationType type, long size);
/** @return the average data size of the specified operation */
long getAvgOperationSize(OperationType type);
} }

View File

@ -111,7 +111,15 @@ public abstract class RateLimiter {
public synchronized void update(final RateLimiter other) { public synchronized void update(final RateLimiter other) {
this.tunit = other.tunit; this.tunit = other.tunit;
if (this.limit < other.limit) { if (this.limit < other.limit) {
this.avail += (other.limit - this.limit); // If avail is capped to this.limit, it will never overflow,
// otherwise, avail may overflow, just be careful here.
long diff = other.limit - this.limit;
if (this.avail <= Long.MAX_VALUE - diff) {
this.avail += diff;
this.avail = Math.min(this.avail, other.limit);
} else {
this.avail = other.limit;
}
} }
this.limit = other.limit; this.limit = other.limit;
} }
@ -142,10 +150,14 @@ public abstract class RateLimiter {
/** /**
* Are there enough available resources to allow execution? * Are there enough available resources to allow execution?
* @param amount the number of required resources * @param amount the number of required resources, a non-negative number
* @return true if there are enough available resources, otherwise false * @return true if there are enough available resources, otherwise false
*/ */
public synchronized boolean canExecute(final long amount) { public synchronized boolean canExecute(final long amount) {
if (isBypass()) {
return true;
}
long refillAmount = refill(limit); long refillAmount = refill(limit);
if (refillAmount == 0 && avail < amount) { if (refillAmount == 0 && avail < amount) {
return false; return false;
@ -170,13 +182,27 @@ public abstract class RateLimiter {
} }
/** /**
* consume amount available units. * consume amount available units, amount could be a negative number
* @param amount the number of units to consume * @param amount the number of units to consume
*/ */
public synchronized void consume(final long amount) { public synchronized void consume(final long amount) {
this.avail -= amount;
if (this.avail < 0) { if (isBypass()) {
this.avail = 0; return;
}
if (amount >= 0 ) {
this.avail -= amount;
if (this.avail < 0) {
this.avail = 0;
}
} else {
if (this.avail <= Long.MAX_VALUE + amount) {
this.avail -= amount;
this.avail = Math.min(this.avail, this.limit);
} else {
this.avail = this.limit;
}
} }
} }

View File

@ -26,8 +26,6 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
/** /**
* Simple time based limiter that checks the quota Throttle * Simple time based limiter that checks the quota Throttle
@ -42,7 +40,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
private RateLimiter writeSizeLimiter = null; private RateLimiter writeSizeLimiter = null;
private RateLimiter readReqsLimiter = null; private RateLimiter readReqsLimiter = null;
private RateLimiter readSizeLimiter = null; private RateLimiter readSizeLimiter = null;
private AvgOperationSize avgOpSize = new AvgOperationSize();
private TimeBasedLimiter() { private TimeBasedLimiter() {
if (FixedIntervalRateLimiter.class.getName().equals( if (FixedIntervalRateLimiter.class.getName().equals(
@ -185,16 +182,6 @@ public class TimeBasedLimiter implements QuotaLimiter {
return readSizeLimiter.getAvailable(); return readSizeLimiter.getAvailable();
} }
@Override
public void addOperationSize(OperationType type, long size) {
avgOpSize.addOperationSize(type, size);
}
@Override
public long getAvgOperationSize(OperationType type) {
return avgOpSize.getAvgOperationSize(type);
}
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();

View File

@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -277,4 +278,133 @@ public class TestRateLimiter {
limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000); limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
assertEquals(60, limiter.refill(limiter.getLimit())); assertEquals(60, limiter.refill(limiter.getLimit()));
} }
@Test
public void testUnconfiguredLimiters() throws InterruptedException {
ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(testEdge);
long limit = Long.MAX_VALUE;
// For unconfigured limiters, it is supposed to use as much as possible
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
RateLimiter fixLimiter = new FixedIntervalRateLimiter();
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
// after 100 millseconds, it should be able to execute limit as well
testEdge.incValue(100);
assertTrue(avgLimiter.canExecute(limit));
avgLimiter.consume(limit);
assertTrue(fixLimiter.canExecute(limit));
fixLimiter.consume(limit);
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
@Test
public void testExtremeLimiters() throws InterruptedException {
ManualEnvironmentEdge testEdge = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge(testEdge);
long limit = Long.MAX_VALUE - 1;
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
avgLimiter.set(limit, TimeUnit.SECONDS);
RateLimiter fixLimiter = new FixedIntervalRateLimiter();
fixLimiter.set(limit, TimeUnit.SECONDS);
assertEquals(limit, avgLimiter.getAvailable());
assertEquals(limit, fixLimiter.getAvailable());
assertTrue(avgLimiter.canExecute(limit / 2));
avgLimiter.consume(limit / 2);
assertTrue(fixLimiter.canExecute(limit / 2));
fixLimiter.consume(limit / 2);
// Make sure that available is whatever left
assertTrue((limit - (limit / 2)) == avgLimiter.getAvailable());
assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
// after 100 millseconds, both should not be able to execute the limit
testEdge.incValue(100);
assertFalse(avgLimiter.canExecute(limit));
assertFalse(fixLimiter.canExecute(limit));
// after 500 millseconds, average interval limiter should be able to execute the limit
testEdge.incValue(500);
assertTrue(avgLimiter.canExecute(limit));
assertFalse(fixLimiter.canExecute(limit));
// Make sure that available is correct
assertTrue(limit == avgLimiter.getAvailable());
assertTrue((limit - (limit / 2)) == fixLimiter.getAvailable());
// after 500 millseconds, both should be able to execute
testEdge.incValue(500);
assertTrue(avgLimiter.canExecute(limit));
assertTrue(fixLimiter.canExecute(limit));
// Make sure that available is Long.MAX_VALUE
assertTrue(limit == avgLimiter.getAvailable());
assertTrue(limit == fixLimiter.getAvailable());
EnvironmentEdgeManager.reset();
}
/*
* This test case is tricky. Basically, it simulates the following events:
* Thread-1 Thread-2
* t0: canExecute(100) and consume(100)
* t1: canExecute(100), avail may be increased by 80
* t2: consume(-80) as actual size is 20
* It will check if consume(-80) can handle overflow correctly.
*/
@Test
public void testLimiterCompensationOverflow() throws InterruptedException {
long limit = Long.MAX_VALUE - 1;
long guessNumber = 100;
// For unconfigured limiters, it is supposed to use as much as possible
RateLimiter avgLimiter = new AverageIntervalRateLimiter();
avgLimiter.set(limit, TimeUnit.SECONDS);
assertEquals(limit, avgLimiter.getAvailable());
// The initial guess is that 100 bytes.
assertTrue(avgLimiter.canExecute(guessNumber));
avgLimiter.consume(guessNumber);
// Make sure that available is whatever left
assertTrue((limit - guessNumber) == avgLimiter.getAvailable());
// Manually set avil to simulate that another thread call canExecute().
// It is simulated by consume().
avgLimiter.consume(-80);
assertTrue((limit - guessNumber + 80) == avgLimiter.getAvailable());
// Now thread1 compensates 80
avgLimiter.consume(-80);
assertTrue(limit == avgLimiter.getAvailable());
}
} }