HBASE-13686 - Fail to limit rate in RateLimiter (Ashish Singhi)

This commit is contained in:
ramkrishna 2015-06-07 16:34:44 +05:30
parent 587b0b4f20
commit 9f43a3bea6
5 changed files with 322 additions and 106 deletions

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
/**
* This limiter will refill resources at every TimeUnit/resources interval. For example: For a
* limiter configured with 10resources/second, then 1 resource will be refilled after every 100ms
* (1sec/10resources)
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class AverageIntervalRateLimiter extends RateLimiter {
private long nextRefillTime = -1L;
@Override
public long refill(long limit, long available) {
final long now = EnvironmentEdgeManager.currentTime();
if (nextRefillTime == -1) {
// Till now no resource has been consumed.
nextRefillTime = EnvironmentEdgeManager.currentTime();
return limit;
}
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
if (delta > 0) {
this.nextRefillTime = now;
return Math.min(limit, available + delta);
}
return 0;
}
@Override
public long getWaitInterval(long limit, long available, long amount) {
if (nextRefillTime == -1) {
return 0;
}
long timeUnitInMillis = super.getTimeUnitInMillis();
return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
}
// This method is for strictly testing purpose only
@VisibleForTesting
public void setNextRefillTime(long nextRefillTime) {
this.nextRefillTime = nextRefillTime;
}
@VisibleForTesting
public long getNextRefillTime() {
return this.nextRefillTime;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.common.annotations.VisibleForTesting;
/**
* With this limiter resources will be refilled only after a fixed interval of time.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FixedIntervalRateLimiter extends RateLimiter {
private long nextRefillTime = -1L;
@Override
public long refill(long limit, long available) {
final long now = EnvironmentEdgeManager.currentTime();
if (now < nextRefillTime) {
return 0;
}
nextRefillTime = now + super.getTimeUnitInMillis();
return limit;
}
@Override
public long getWaitInterval(long limit, long available, long amount) {
if (nextRefillTime == -1) {
return 0;
}
final long now = EnvironmentEdgeManager.currentTime();
final long refillTime = nextRefillTime;
return refillTime - now;
}
// This method is for strictly testing purpose only
@VisibleForTesting
public void setNextRefillTime(long nextRefillTime) {
this.nextRefillTime = nextRefillTime;
}
@VisibleForTesting
public long getNextRefillTime() {
return this.nextRefillTime;
}
}

View File

@ -23,19 +23,20 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Simple rate limiter. * Simple rate limiter.
* *
* Usage Example: * Usage Example:
* RateLimiter limiter = new RateLimiter(); // At this point you have a unlimited resource limiter * // At this point you have a unlimited resource limiter
* RateLimiter limiter = new AverageIntervalRateLimiter();
* or new FixedIntervalRateLimiter();
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec
* *
* long lastTs = 0; // You need to keep track of the last update timestamp
* while (true) { * while (true) {
* long now = System.currentTimeMillis();
*
* // call canExecute before performing resource consuming operation * // call canExecute before performing resource consuming operation
* bool canExecute = limiter.canExecute(now, lastTs); * bool canExecute = limiter.canExecute();
* // If there are no available resources, wait until one is available * // If there are no available resources, wait until one is available
* if (!canExecute) Thread.sleep(limiter.waitInterval()); * if (!canExecute) Thread.sleep(limiter.waitInterval());
* // ...execute the work and consume the resource... * // ...execute the work and consume the resource...
@ -44,13 +45,28 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RateLimiter { public abstract class RateLimiter {
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
private long tunit = 1000; // Timeunit factor for translating to ms. private long tunit = 1000; // Timeunit factor for translating to ms.
private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
private long avail = Long.MAX_VALUE; // Currently available resource units private long avail = Long.MAX_VALUE; // Currently available resource units
public RateLimiter() { /**
} * Refill the available units w.r.t the elapsed time.
* @param limit Maximum available resource units that can be refilled to.
* @param available Currently available resource units
*/
abstract long refill(long limit, long available);
/**
* Time in milliseconds to wait for before requesting to consume 'amount' resource.
* @param limit Maximum available resource units that can be refilled to.
* @param available Currently available resource units
* @param amount Resources for which time interval to calculate for
* @return estimate of the ms required to wait before being able to provide 'amount' resources.
*/
abstract long getWaitInterval(long limit, long available, long amount);
/** /**
* Set the RateLimiter max available resources and refill period. * Set the RateLimiter max available resources and refill period.
@ -59,35 +75,34 @@ public class RateLimiter {
*/ */
public void set(final long limit, final TimeUnit timeUnit) { public void set(final long limit, final TimeUnit timeUnit) {
switch (timeUnit) { switch (timeUnit) {
case NANOSECONDS: case MILLISECONDS:
throw new RuntimeException("Unsupported NANOSECONDS TimeUnit"); tunit = 1;
case MICROSECONDS: break;
throw new RuntimeException("Unsupported MICROSECONDS TimeUnit"); case SECONDS:
case MILLISECONDS: tunit = 1000;
tunit = 1; break;
break; case MINUTES:
case SECONDS: tunit = 60 * 1000;
tunit = 1000; break;
break; case HOURS:
case MINUTES: tunit = 60 * 60 * 1000;
tunit = 60 * 1000; break;
break; case DAYS:
case HOURS: tunit = 24 * 60 * 60 * 1000;
tunit = 60 * 60 * 1000; break;
break; default:
case DAYS: throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
tunit = 24 * 60 * 60 * 1000;
break;
} }
this.limit = limit; this.limit = limit;
this.avail = limit; this.avail = limit;
} }
public String toString() { public String toString() {
String rateLimiter = this.getClass().getSimpleName();
if (limit == Long.MAX_VALUE) { if (limit == Long.MAX_VALUE) {
return "RateLimiter(Bypass)"; return rateLimiter + "(Bypass)";
} }
return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")"; return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
} }
/** /**
@ -116,25 +131,38 @@ public class RateLimiter {
return avail; return avail;
} }
/** protected long getTimeUnitInMillis() {
* given the time interval, is there at least one resource available to allow execution? return tunit;
* @param now the current timestamp
* @param lastTs the timestamp of the last update
* @return true if there is at least one resource available, otherwise false
*/
public boolean canExecute(final long now, final long lastTs) {
return canExecute(now, lastTs, 1);
} }
/** /**
* given the time interval, are there enough available resources to allow execution? * Is there at least one resource available to allow execution?
* @param now the current timestamp * @return true if there is at least one resource available, otherwise false
* @param lastTs the timestamp of the last update */
public boolean canExecute() {
return canExecute(1);
}
/**
* Are there enough available resources to allow execution?
* @param amount the number of required resources * @param amount the number of required resources
* @return true if there are enough available resources, otherwise false * @return true if there are enough available resources, otherwise false
*/ */
public synchronized boolean canExecute(final long now, final long lastTs, final long amount) { public synchronized boolean canExecute(final long amount) {
return avail >= amount ? true : refill(now, lastTs) >= amount; long refillAmount = refill(limit, avail);
if (refillAmount == 0 && avail < amount) {
return false;
}
// check for positive overflow
if (avail <= Long.MAX_VALUE - refillAmount) {
avail = Math.max(0, Math.min(avail + refillAmount, limit));
} else {
avail = Math.max(0, limit);
}
if (avail >= amount) {
return true;
}
return false;
} }
/** /**
@ -150,6 +178,9 @@ public class RateLimiter {
*/ */
public synchronized void consume(final long amount) { public synchronized void consume(final long amount) {
this.avail -= amount; this.avail -= amount;
if (this.avail < 0) {
this.avail = 0;
}
} }
/** /**
@ -164,18 +195,16 @@ public class RateLimiter {
*/ */
public synchronized long waitInterval(final long amount) { public synchronized long waitInterval(final long amount) {
// TODO Handle over quota? // TODO Handle over quota?
return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit); return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
} }
/** // This method is for strictly testing purpose only
* given the specified time interval, refill the avilable units to the proportionate @VisibleForTesting
* to elapsed time or to the prespecified limit. public void setNextRefillTime(long nextRefillTime) {
*/ this.setNextRefillTime(nextRefillTime);
private long refill(final long now, final long lastTs) { }
long delta = (limit * (now - lastTs)) / tunit;
if (delta > 0) { public long getNextRefillTime() {
avail = Math.min(limit, avail + delta); return this.getNextRefillTime();
}
return avail;
} }
} }

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.quotas; package org.apache.hadoop.hbase.quotas;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize; import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
* Simple time based limiter that checks the quota Throttle * Simple time based limiter that checks the quota Throttle
@ -34,18 +35,33 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class TimeBasedLimiter implements QuotaLimiter { public class TimeBasedLimiter implements QuotaLimiter {
private long writeLastTs = 0; private static final Configuration conf = HBaseConfiguration.create();
private long readLastTs = 0; private RateLimiter reqsLimiter = null;
private RateLimiter reqSizeLimiter = null;
private RateLimiter reqsLimiter = new RateLimiter(); private RateLimiter writeReqsLimiter = null;
private RateLimiter reqSizeLimiter = new RateLimiter(); private RateLimiter writeSizeLimiter = null;
private RateLimiter writeReqsLimiter = new RateLimiter(); private RateLimiter readReqsLimiter = null;
private RateLimiter writeSizeLimiter = new RateLimiter(); private RateLimiter readSizeLimiter = null;
private RateLimiter readReqsLimiter = new RateLimiter();
private RateLimiter readSizeLimiter = new RateLimiter();
private AvgOperationSize avgOpSize = new AvgOperationSize(); private AvgOperationSize avgOpSize = new AvgOperationSize();
private TimeBasedLimiter() { private TimeBasedLimiter() {
if (FixedIntervalRateLimiter.class.getName().equals(
conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
.getName())) {
reqsLimiter = new FixedIntervalRateLimiter();
reqSizeLimiter = new FixedIntervalRateLimiter();
writeReqsLimiter = new FixedIntervalRateLimiter();
writeSizeLimiter = new FixedIntervalRateLimiter();
readReqsLimiter = new FixedIntervalRateLimiter();
readSizeLimiter = new FixedIntervalRateLimiter();
} else {
reqsLimiter = new AverageIntervalRateLimiter();
reqSizeLimiter = new AverageIntervalRateLimiter();
writeReqsLimiter = new AverageIntervalRateLimiter();
writeSizeLimiter = new AverageIntervalRateLimiter();
readReqsLimiter = new AverageIntervalRateLimiter();
readSizeLimiter = new AverageIntervalRateLimiter();
}
} }
static QuotaLimiter fromThrottle(final Throttle throttle) { static QuotaLimiter fromThrottle(final Throttle throttle) {
@ -97,33 +113,29 @@ public class TimeBasedLimiter implements QuotaLimiter {
} }
@Override @Override
public void checkQuota(long writeSize, long readSize) public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
throws ThrottlingException { if (!reqsLimiter.canExecute()) {
long now = EnvironmentEdgeManager.currentTime();
long lastTs = Math.max(readLastTs, writeLastTs);
if (!reqsLimiter.canExecute(now, lastTs)) {
ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
} }
if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) { if (!reqSizeLimiter.canExecute(writeSize + readSize)) {
ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter
.waitInterval(writeSize + readSize)); .waitInterval(writeSize + readSize));
} }
if (writeSize > 0) { if (writeSize > 0) {
if (!writeReqsLimiter.canExecute(now, writeLastTs)) { if (!writeReqsLimiter.canExecute()) {
ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
} }
if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) { if (!writeSizeLimiter.canExecute(writeSize)) {
ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
} }
} }
if (readSize > 0) { if (readSize > 0) {
if (!readReqsLimiter.canExecute(now, readLastTs)) { if (!readReqsLimiter.canExecute()) {
ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
} }
if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) { if (!readSizeLimiter.canExecute(readSize)) {
ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
} }
} }
@ -133,20 +145,16 @@ public class TimeBasedLimiter implements QuotaLimiter {
public void grabQuota(long writeSize, long readSize) { public void grabQuota(long writeSize, long readSize) {
assert writeSize != 0 || readSize != 0; assert writeSize != 0 || readSize != 0;
long now = EnvironmentEdgeManager.currentTime();
reqsLimiter.consume(1); reqsLimiter.consume(1);
reqSizeLimiter.consume(writeSize + readSize); reqSizeLimiter.consume(writeSize + readSize);
if (writeSize > 0) { if (writeSize > 0) {
writeReqsLimiter.consume(1); writeReqsLimiter.consume(1);
writeSizeLimiter.consume(writeSize); writeSizeLimiter.consume(writeSize);
writeLastTs = now;
} }
if (readSize > 0) { if (readSize > 0) {
readReqsLimiter.consume(1); readReqsLimiter.consume(1);
readSizeLimiter.consume(readSize); readSizeLimiter.consume(readSize);
readLastTs = now;
} }
} }

View File

@ -18,19 +18,17 @@
package org.apache.hadoop.hbase.quotas; package org.apache.hadoop.hbase.quotas;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/** /**
* Verify the behaviour of the Rate Limiter. * Verify the behaviour of the Rate Limiter.
*/ */
@ -58,16 +56,14 @@ public class TestRateLimiter {
private void testWaitInterval(final TimeUnit timeUnit, final long limit, private void testWaitInterval(final TimeUnit timeUnit, final long limit,
final long expectedWaitInterval) { final long expectedWaitInterval) {
RateLimiter limiter = new RateLimiter(); RateLimiter limiter = new AverageIntervalRateLimiter();
limiter.set(limit, timeUnit); limiter.set(limit, timeUnit);
long nowTs = 0; long nowTs = 0;
long lastTs = 0;
// consume all the available resources, one request at the time. // consume all the available resources, one request at the time.
// the wait interval should be 0 // the wait interval should be 0
for (int i = 0; i < (limit - 1); ++i) { for (int i = 0; i < (limit - 1); ++i) {
assertTrue(limiter.canExecute(nowTs, lastTs)); assertTrue(limiter.canExecute());
limiter.consume(); limiter.consume();
long waitInterval = limiter.waitInterval(); long waitInterval = limiter.waitInterval();
assertEquals(0, waitInterval); assertEquals(0, waitInterval);
@ -76,40 +72,102 @@ public class TestRateLimiter {
for (int i = 0; i < (limit * 4); ++i) { for (int i = 0; i < (limit * 4); ++i) {
// There is one resource available, so we should be able to // There is one resource available, so we should be able to
// consume it without waiting. // consume it without waiting.
assertTrue(limiter.canExecute(nowTs, lastTs)); limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs);
assertTrue(limiter.canExecute());
assertEquals(0, limiter.waitInterval()); assertEquals(0, limiter.waitInterval());
limiter.consume(); limiter.consume();
lastTs = nowTs;
// No more resources are available, we should wait for at least an interval. // No more resources are available, we should wait for at least an interval.
long waitInterval = limiter.waitInterval(); long waitInterval = limiter.waitInterval();
assertEquals(expectedWaitInterval, waitInterval); assertEquals(expectedWaitInterval, waitInterval);
// set the nowTs to be the exact time when resources should be available again. // set the nowTs to be the exact time when resources should be available again.
nowTs += waitInterval; nowTs = waitInterval;
// artificially go into the past to prove that when too early we should fail. // artificially go into the past to prove that when too early we should fail.
assertFalse(limiter.canExecute(nowTs - 500, lastTs)); long temp = nowTs + 500;
limiter.setNextRefillTime(limiter.getNextRefillTime() + temp);
assertFalse(limiter.canExecute());
//Roll back the nextRefillTime set to continue further testing
limiter.setNextRefillTime(limiter.getNextRefillTime() - temp);
} }
} }
@Test @Test
public void testOverconsumption() { public void testOverconsumptionAverageIntervalRefillStrategy() {
RateLimiter limiter = new RateLimiter(); RateLimiter limiter = new AverageIntervalRateLimiter();
limiter.set(10, TimeUnit.SECONDS); limiter.set(10, TimeUnit.SECONDS);
// 10 resources are available, but we need to consume 20 resources // 10 resources are available, but we need to consume 20 resources
// Verify that we have to wait at least 1.1sec to have 1 resource available // Verify that we have to wait at least 1.1sec to have 1 resource available
assertTrue(limiter.canExecute(0, 0)); assertTrue(limiter.canExecute());
limiter.consume(20); limiter.consume(20);
assertEquals(1100, limiter.waitInterval()); // To consume 1 resource wait for 100ms
assertEquals(100, limiter.waitInterval(1));
// To consume 10 resource wait for 1000ms
assertEquals(1000, limiter.waitInterval(10));
// Verify that after 1sec we need to wait for another 0.1sec to get a resource available limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
assertFalse(limiter.canExecute(1000, 0)); // Verify that after 1sec the 1 resource is available
assertEquals(100, limiter.waitInterval()); assertTrue(limiter.canExecute(1));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
// Verify that after 1.1sec the resource is available // Verify that after 1sec the 10 resource is available
assertTrue(limiter.canExecute(1100, 0)); assertTrue(limiter.canExecute());
assertEquals(0, limiter.waitInterval()); assertEquals(0, limiter.waitInterval());
} }
@Test
public void testOverconsumptionFixedIntervalRefillStrategy() throws InterruptedException {
RateLimiter limiter = new FixedIntervalRateLimiter();
limiter.set(10, TimeUnit.SECONDS);
// 10 resources are available, but we need to consume 20 resources
// Verify that we have to wait at least 1.1sec to have 1 resource available
assertTrue(limiter.canExecute());
limiter.consume(20);
// To consume 1 resource also wait for 1000ms
assertEquals(1000, limiter.waitInterval(1));
// To consume 10 resource wait for 100ms
assertEquals(1000, limiter.waitInterval(10));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
// Verify that after 1sec also no resource should be available
assertFalse(limiter.canExecute(1));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
// Verify that after 1sec the 10 resource is available
assertTrue(limiter.canExecute());
assertEquals(0, limiter.waitInterval());
}
@Test
public void testFixedIntervalResourceAvailability() throws Exception {
RateLimiter limiter = new FixedIntervalRateLimiter();
limiter.set(10, TimeUnit.MILLISECONDS);
assertTrue(limiter.canExecute(10));
limiter.consume(3);
assertEquals(7, limiter.getAvailable());
assertFalse(limiter.canExecute(10));
limiter.setNextRefillTime(limiter.getNextRefillTime() - 3);
assertTrue(limiter.canExecute(10));
assertEquals(10, limiter.getAvailable());
}
@Test
public void testLimiterBySmallerRate() throws InterruptedException {
// set limiter is 10 resources per seconds
RateLimiter limiter = new FixedIntervalRateLimiter();
limiter.set(10, TimeUnit.SECONDS);
int count = 0; // control the test count
while ((count++) < 10) {
// test will get 3 resources per 0.5 sec. so it will get 6 resources per sec.
limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
for (int i = 0; i < 3; i++) {
// 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true
assertEquals(true, limiter.canExecute());
limiter.consume();
}
}
}
} }