HBASE-13888 Fix refill bug from HBASE-13686 (Guanghao Zhang)
This commit is contained in:
parent
8d1d6c16da
commit
84a50393ee
|
@ -26,7 +26,7 @@ public class AverageIntervalRateLimiter extends RateLimiter {
|
||||||
private long nextRefillTime = -1L;
|
private long nextRefillTime = -1L;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long refill(long limit, long available) {
|
public long refill(long limit) {
|
||||||
final long now = EnvironmentEdgeManager.currentTime();
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
if (nextRefillTime == -1) {
|
if (nextRefillTime == -1) {
|
||||||
// Till now no resource has been consumed.
|
// Till now no resource has been consumed.
|
||||||
|
@ -37,7 +37,7 @@ public class AverageIntervalRateLimiter extends RateLimiter {
|
||||||
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
|
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
|
||||||
if (delta > 0) {
|
if (delta > 0) {
|
||||||
this.nextRefillTime = now;
|
this.nextRefillTime = now;
|
||||||
return Math.min(limit, available + delta);
|
return Math.min(limit, delta);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ public class FixedIntervalRateLimiter extends RateLimiter {
|
||||||
private long nextRefillTime = -1L;
|
private long nextRefillTime = -1L;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long refill(long limit, long available) {
|
public long refill(long limit) {
|
||||||
final long now = EnvironmentEdgeManager.currentTime();
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
if (now < nextRefillTime) {
|
if (now < nextRefillTime) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -54,9 +54,9 @@ public abstract class RateLimiter {
|
||||||
/**
|
/**
|
||||||
* Refill the available units w.r.t the elapsed time.
|
* Refill the available units w.r.t the elapsed time.
|
||||||
* @param limit Maximum available resource units that can be refilled to.
|
* @param limit Maximum available resource units that can be refilled to.
|
||||||
* @param available Currently available resource units
|
* @return how many resource units may be refilled ?
|
||||||
*/
|
*/
|
||||||
abstract long refill(long limit, long available);
|
abstract long refill(long limit);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Time in milliseconds to wait for before requesting to consume 'amount' resource.
|
* Time in milliseconds to wait for before requesting to consume 'amount' resource.
|
||||||
|
@ -149,7 +149,7 @@ public abstract class RateLimiter {
|
||||||
* @return true if there are enough available resources, otherwise false
|
* @return true if there are enough available resources, otherwise false
|
||||||
*/
|
*/
|
||||||
public synchronized boolean canExecute(final long amount) {
|
public synchronized boolean canExecute(final long amount) {
|
||||||
long refillAmount = refill(limit, avail);
|
long refillAmount = refill(limit);
|
||||||
if (refillAmount == 0 && avail < amount) {
|
if (refillAmount == 0 && avail < amount) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -170,4 +171,118 @@ public class TestRateLimiter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testCanExecuteOfAverageIntervalRateLimiter() throws InterruptedException {
|
||||||
|
RateLimiter limiter = new AverageIntervalRateLimiter();
|
||||||
|
// when set limit is 100 per sec, this AverageIntervalRateLimiter will support at max 200 per sec
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(50, testCanExecuteByRate(limiter, 50));
|
||||||
|
|
||||||
|
// refill the avail to limit
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(100, testCanExecuteByRate(limiter, 100));
|
||||||
|
|
||||||
|
// refill the avail to limit
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(200, testCanExecuteByRate(limiter, 200));
|
||||||
|
|
||||||
|
// refill the avail to limit
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(200, testCanExecuteByRate(limiter, 500));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanExecuteOfFixedIntervalRateLimiter() throws InterruptedException {
|
||||||
|
RateLimiter limiter = new FixedIntervalRateLimiter();
|
||||||
|
// when set limit is 100 per sec, this FixedIntervalRateLimiter will support at max 100 per sec
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(50, testCanExecuteByRate(limiter, 50));
|
||||||
|
|
||||||
|
// refill the avail to limit
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(100, testCanExecuteByRate(limiter, 100));
|
||||||
|
|
||||||
|
// refill the avail to limit
|
||||||
|
limiter.set(100, TimeUnit.SECONDS);
|
||||||
|
limiter.setNextRefillTime(EnvironmentEdgeManager.currentTime());
|
||||||
|
assertEquals(100, testCanExecuteByRate(limiter, 200));
|
||||||
|
}
|
||||||
|
|
||||||
|
public int testCanExecuteByRate(RateLimiter limiter, int rate) {
|
||||||
|
int request = 0;
|
||||||
|
int count = 0;
|
||||||
|
while ((request++) < rate) {
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - limiter.getTimeUnitInMillis() / rate);
|
||||||
|
if (limiter.canExecute()) {
|
||||||
|
count++;
|
||||||
|
limiter.consume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefillOfAverageIntervalRateLimiter() throws InterruptedException {
|
||||||
|
RateLimiter limiter = new AverageIntervalRateLimiter();
|
||||||
|
limiter.set(60, TimeUnit.SECONDS);
|
||||||
|
assertEquals(60, limiter.getAvailable());
|
||||||
|
// first refill, will return the number same with limit
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
limiter.consume(30);
|
||||||
|
|
||||||
|
// after 0.2 sec, refill should return 12
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 200);
|
||||||
|
assertEquals(12, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after 0.5 sec, refill should return 30
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
|
||||||
|
assertEquals(30, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after 1 sec, refill should return 60
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after more than 1 sec, refill should return at max 60
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 3000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRefillOfFixedIntervalRateLimiter() throws InterruptedException {
|
||||||
|
RateLimiter limiter = new FixedIntervalRateLimiter();
|
||||||
|
limiter.set(60, TimeUnit.SECONDS);
|
||||||
|
assertEquals(60, limiter.getAvailable());
|
||||||
|
// first refill, will return the number same with limit
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
limiter.consume(30);
|
||||||
|
|
||||||
|
// after 0.2 sec, refill should return 0
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 200);
|
||||||
|
assertEquals(0, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after 0.5 sec, refill should return 0
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
|
||||||
|
assertEquals(0, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after 1 sec, refill should return 60
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 1000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
|
||||||
|
// after more than 1 sec, refill should return at max 60
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 3000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 5000);
|
||||||
|
assertEquals(60, limiter.refill(limiter.getLimit()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue