HBASE-13686 - Fail to limit rate in RateLimiter (Ashish Singhi)
This commit is contained in:
parent
deca3930b9
commit
d34e9c5c5c
|
@ -30,11 +30,12 @@ public class ThrottlingException extends QuotaExceededException {
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public enum Type {
|
public enum Type {
|
||||||
NumRequestsExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded, WriteSizeExceeded,
|
NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
|
||||||
ReadSizeExceeded,
|
WriteSizeExceeded, ReadSizeExceeded,
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String[] MSG_TYPE = new String[] { "number of requests exceeded",
|
private static final String[] MSG_TYPE =
|
||||||
|
new String[] { "number of requests exceeded", "request size limit exceeded",
|
||||||
"number of read requests exceeded", "number of write requests exceeded",
|
"number of read requests exceeded", "number of write requests exceeded",
|
||||||
"write size limit exceeded", "read size limit exceeded", };
|
"write size limit exceeded", "read size limit exceeded", };
|
||||||
|
|
||||||
|
@ -77,6 +78,11 @@ public class ThrottlingException extends QuotaExceededException {
|
||||||
throwThrottlingException(Type.NumRequestsExceeded, waitInterval);
|
throwThrottlingException(Type.NumRequestsExceeded, waitInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void throwRequestSizeExceeded(final long waitInterval)
|
||||||
|
throws ThrottlingException {
|
||||||
|
throwThrottlingException(Type.RequestSizeExceeded, waitInterval);
|
||||||
|
}
|
||||||
|
|
||||||
public static void throwNumReadRequestsExceeded(final long waitInterval)
|
public static void throwNumReadRequestsExceeded(final long waitInterval)
|
||||||
throws ThrottlingException {
|
throws ThrottlingException {
|
||||||
throwThrottlingException(Type.NumReadRequestsExceeded, waitInterval);
|
throwThrottlingException(Type.NumReadRequestsExceeded, waitInterval);
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This limiter will refill resources at every TimeUnit/resources interval. For example: For a
|
||||||
|
* limiter configured with 10resources/second, then 1 resource will be refilled after every 100ms
|
||||||
|
* (1sec/10resources)
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class AverageIntervalRateLimiter extends RateLimiter {
|
||||||
|
private long nextRefillTime = -1L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long refill(long limit, long available) {
|
||||||
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
if (nextRefillTime == -1) {
|
||||||
|
// Till now no resource has been consumed.
|
||||||
|
nextRefillTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
return limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
|
||||||
|
if (delta > 0) {
|
||||||
|
this.nextRefillTime = now;
|
||||||
|
return Math.min(limit, available + delta);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWaitInterval(long limit, long available, long amount) {
|
||||||
|
if (nextRefillTime == -1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
long timeUnitInMillis = super.getTimeUnitInMillis();
|
||||||
|
return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method is for strictly testing purpose only
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setNextRefillTime(long nextRefillTime) {
|
||||||
|
this.nextRefillTime = nextRefillTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNextRefillTime() {
|
||||||
|
return this.nextRefillTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* With this limiter resources will be refilled only after a fixed interval of time.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class FixedIntervalRateLimiter extends RateLimiter {
|
||||||
|
private long nextRefillTime = -1L;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long refill(long limit, long available) {
|
||||||
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
if (now < nextRefillTime) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
nextRefillTime = now + super.getTimeUnitInMillis();
|
||||||
|
return limit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWaitInterval(long limit, long available, long amount) {
|
||||||
|
if (nextRefillTime == -1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
final long now = EnvironmentEdgeManager.currentTime();
|
||||||
|
final long refillTime = nextRefillTime;
|
||||||
|
return refillTime - now;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This method is for strictly testing purpose only
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setNextRefillTime(long nextRefillTime) {
|
||||||
|
this.nextRefillTime = nextRefillTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public long getNextRefillTime() {
|
||||||
|
return this.nextRefillTime;
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,36 +16,58 @@ import java.util.concurrent.TimeUnit;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple rate limiter. Usage Example: RateLimiter limiter = new RateLimiter(); // At this point you
|
* Simple rate limiter.
|
||||||
* have a unlimited resource limiter limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec long
|
*
|
||||||
* lastTs = 0; // You need to keep track of the last update timestamp while (true) { long now =
|
* Usage Example:
|
||||||
* System.currentTimeMillis(); // call canExecute before performing resource consuming operation
|
* // At this point you have a unlimited resource limiter
|
||||||
* bool canExecute = limiter.canExecute(now, lastTs); // If there are no available resources, wait
|
* RateLimiter limiter = new AverageIntervalRateLimiter();
|
||||||
* until one is available if (!canExecute) Thread.sleep(limiter.waitInterval()); // ...execute the
|
* or new FixedIntervalRateLimiter();
|
||||||
* work and consume the resource... limiter.consume(); }
|
* limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec
|
||||||
|
*
|
||||||
|
* while (true) {
|
||||||
|
* // call canExecute before performing resource consuming operation
|
||||||
|
* bool canExecute = limiter.canExecute();
|
||||||
|
* // If there are no available resources, wait until one is available
|
||||||
|
* if (!canExecute) Thread.sleep(limiter.waitInterval());
|
||||||
|
* // ...execute the work and consume the resource...
|
||||||
|
* limiter.consume();
|
||||||
|
* }
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class RateLimiter {
|
public abstract class RateLimiter {
|
||||||
|
public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
|
||||||
private long tunit = 1000; // Timeunit factor for translating to ms.
|
private long tunit = 1000; // Timeunit factor for translating to ms.
|
||||||
private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
|
private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
|
||||||
private long avail = Long.MAX_VALUE; // Currently available resource units
|
private long avail = Long.MAX_VALUE; // Currently available resource units
|
||||||
|
|
||||||
public RateLimiter() {
|
/**
|
||||||
}
|
* Refill the available units w.r.t the elapsed time.
|
||||||
|
* @param limit Maximum available resource units that can be refilled to.
|
||||||
|
* @param available Currently available resource units
|
||||||
|
*/
|
||||||
|
abstract long refill(long limit, long available);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time in milliseconds to wait for before requesting to consume 'amount' resource.
|
||||||
|
* @param limit Maximum available resource units that can be refilled to.
|
||||||
|
* @param available Currently available resource units
|
||||||
|
* @param amount Resources for which time interval to calculate for
|
||||||
|
* @return estimate of the ms required to wait before being able to provide 'amount' resources.
|
||||||
|
*/
|
||||||
|
abstract long getWaitInterval(long limit, long available, long amount);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the RateLimiter max available resources and refill period.
|
* Set the RateLimiter max available resources and refill period.
|
||||||
* @param limit The max value available resource units can be refilled to.
|
* @param limit The max value available resource units can be refilled to.
|
||||||
* @param timeUnit Timeunit factor for translating to ms.
|
* @param timeUnit Timeunit factor for translating to ms.
|
||||||
*/
|
*/
|
||||||
public synchronized void set(final long limit, final TimeUnit timeUnit) {
|
public void set(final long limit, final TimeUnit timeUnit) {
|
||||||
switch (timeUnit) {
|
switch (timeUnit) {
|
||||||
case NANOSECONDS:
|
|
||||||
throw new RuntimeException("Unsupported NANOSECONDS TimeUnit");
|
|
||||||
case MICROSECONDS:
|
|
||||||
throw new RuntimeException("Unsupported MICROSECONDS TimeUnit");
|
|
||||||
case MILLISECONDS:
|
case MILLISECONDS:
|
||||||
tunit = 1;
|
tunit = 1;
|
||||||
break;
|
break;
|
||||||
|
@ -62,23 +84,25 @@ public class RateLimiter {
|
||||||
tunit = 24 * 60 * 60 * 1000;
|
tunit = 24 * 60 * 60 * 1000;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Invalid TimeUnit " + timeUnit);
|
throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
|
||||||
}
|
}
|
||||||
this.limit = limit;
|
this.limit = limit;
|
||||||
this.avail = limit;
|
this.avail = limit;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
String rateLimiter = this.getClass().getSimpleName();
|
||||||
if (limit == Long.MAX_VALUE) {
|
if (limit == Long.MAX_VALUE) {
|
||||||
return "RateLimiter(Bypass)";
|
return rateLimiter + "(Bypass)";
|
||||||
}
|
}
|
||||||
return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
|
return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the current instance of RateLimiter to a new values. if current limit is smaller than the
|
* Sets the current instance of RateLimiter to a new values.
|
||||||
* new limit, bump up the available resources. Otherwise allow clients to use up the previously
|
*
|
||||||
* available resources.
|
* if current limit is smaller than the new limit, bump up the available resources.
|
||||||
|
* Otherwise allow clients to use up the previously available resources.
|
||||||
*/
|
*/
|
||||||
public synchronized void update(final RateLimiter other) {
|
public synchronized void update(final RateLimiter other) {
|
||||||
this.tunit = other.tunit;
|
this.tunit = other.tunit;
|
||||||
|
@ -100,25 +124,38 @@ public class RateLimiter {
|
||||||
return avail;
|
return avail;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected long getTimeUnitInMillis() {
|
||||||
* given the time interval, is there at least one resource available to allow execution?
|
return tunit;
|
||||||
* @param now the current timestamp
|
|
||||||
* @param lastTs the timestamp of the last update
|
|
||||||
* @return true if there is at least one resource available, otherwise false
|
|
||||||
*/
|
|
||||||
public boolean canExecute(final long now, final long lastTs) {
|
|
||||||
return canExecute(now, lastTs, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* given the time interval, are there enough available resources to allow execution?
|
* Is there at least one resource available to allow execution?
|
||||||
* @param now the current timestamp
|
* @return true if there is at least one resource available, otherwise false
|
||||||
* @param lastTs the timestamp of the last update
|
*/
|
||||||
|
public boolean canExecute() {
|
||||||
|
return canExecute(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Are there enough available resources to allow execution?
|
||||||
* @param amount the number of required resources
|
* @param amount the number of required resources
|
||||||
* @return true if there are enough available resources, otherwise false
|
* @return true if there are enough available resources, otherwise false
|
||||||
*/
|
*/
|
||||||
public synchronized boolean canExecute(final long now, final long lastTs, final long amount) {
|
public synchronized boolean canExecute(final long amount) {
|
||||||
return avail >= amount ? true : refill(now, lastTs) >= amount;
|
long refillAmount = refill(limit, avail);
|
||||||
|
if (refillAmount == 0 && avail < amount) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// check for positive overflow
|
||||||
|
if (avail <= Long.MAX_VALUE - refillAmount) {
|
||||||
|
avail = Math.max(0, Math.min(avail + refillAmount, limit));
|
||||||
|
} else {
|
||||||
|
avail = Math.max(0, limit);
|
||||||
|
}
|
||||||
|
if (avail >= amount) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -134,6 +171,9 @@ public class RateLimiter {
|
||||||
*/
|
*/
|
||||||
public synchronized void consume(final long amount) {
|
public synchronized void consume(final long amount) {
|
||||||
this.avail -= amount;
|
this.avail -= amount;
|
||||||
|
if (this.avail < 0) {
|
||||||
|
this.avail = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -148,18 +188,16 @@ public class RateLimiter {
|
||||||
*/
|
*/
|
||||||
public synchronized long waitInterval(final long amount) {
|
public synchronized long waitInterval(final long amount) {
|
||||||
// TODO Handle over quota?
|
// TODO Handle over quota?
|
||||||
return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit);
|
return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// This method is for strictly testing purpose only
|
||||||
* given the specified time interval, refill the avilable units to the proportionate to elapsed
|
@VisibleForTesting
|
||||||
* time or to the prespecified limit.
|
public void setNextRefillTime(long nextRefillTime) {
|
||||||
*/
|
this.setNextRefillTime(nextRefillTime);
|
||||||
private long refill(final long now, final long lastTs) {
|
|
||||||
long delta = (limit * (now - lastTs)) / tunit;
|
|
||||||
if (delta > 0) {
|
|
||||||
avail = Math.min(limit, avail + delta);
|
|
||||||
}
|
}
|
||||||
return avail;
|
|
||||||
|
public long getNextRefillTime() {
|
||||||
|
return this.getNextRefillTime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,16 +1,26 @@
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
* distributed with this work for additional information
|
||||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
* "License"); you may not use this file except in compliance
|
||||||
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
* with the License. You may obtain a copy of the License at
|
||||||
* for the specific language governing permissions and limitations under the License.
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.quotas;
|
package org.apache.hadoop.hbase.quotas;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
|
@ -18,26 +28,40 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
|
import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
|
||||||
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
|
import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
|
||||||
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
|
import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple time based limiter that checks the quota Throttle
|
* Simple time based limiter that checks the quota Throttle
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public final class TimeBasedLimiter implements QuotaLimiter {
|
public class TimeBasedLimiter implements QuotaLimiter {
|
||||||
private long writeLastTs = 0;
|
private static final Configuration conf = HBaseConfiguration.create();
|
||||||
private long readLastTs = 0;
|
private RateLimiter reqsLimiter = null;
|
||||||
|
private RateLimiter reqSizeLimiter = null;
|
||||||
private RateLimiter reqsLimiter = new RateLimiter();
|
private RateLimiter writeReqsLimiter = null;
|
||||||
private RateLimiter reqSizeLimiter = new RateLimiter();
|
private RateLimiter writeSizeLimiter = null;
|
||||||
private RateLimiter writeReqsLimiter = new RateLimiter();
|
private RateLimiter readReqsLimiter = null;
|
||||||
private RateLimiter writeSizeLimiter = new RateLimiter();
|
private RateLimiter readSizeLimiter = null;
|
||||||
private RateLimiter readReqsLimiter = new RateLimiter();
|
|
||||||
private RateLimiter readSizeLimiter = new RateLimiter();
|
|
||||||
private AvgOperationSize avgOpSize = new AvgOperationSize();
|
private AvgOperationSize avgOpSize = new AvgOperationSize();
|
||||||
|
|
||||||
private TimeBasedLimiter() {
|
private TimeBasedLimiter() {
|
||||||
|
if (FixedIntervalRateLimiter.class.getName().equals(
|
||||||
|
conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
|
||||||
|
.getName())) {
|
||||||
|
reqsLimiter = new FixedIntervalRateLimiter();
|
||||||
|
reqSizeLimiter = new FixedIntervalRateLimiter();
|
||||||
|
writeReqsLimiter = new FixedIntervalRateLimiter();
|
||||||
|
writeSizeLimiter = new FixedIntervalRateLimiter();
|
||||||
|
readReqsLimiter = new FixedIntervalRateLimiter();
|
||||||
|
readSizeLimiter = new FixedIntervalRateLimiter();
|
||||||
|
} else {
|
||||||
|
reqsLimiter = new AverageIntervalRateLimiter();
|
||||||
|
reqSizeLimiter = new AverageIntervalRateLimiter();
|
||||||
|
writeReqsLimiter = new AverageIntervalRateLimiter();
|
||||||
|
writeSizeLimiter = new AverageIntervalRateLimiter();
|
||||||
|
readReqsLimiter = new AverageIntervalRateLimiter();
|
||||||
|
readSizeLimiter = new AverageIntervalRateLimiter();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static QuotaLimiter fromThrottle(final Throttle throttle) {
|
static QuotaLimiter fromThrottle(final Throttle throttle) {
|
||||||
|
@ -90,31 +114,28 @@ public final class TimeBasedLimiter implements QuotaLimiter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
|
public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
if (!reqsLimiter.canExecute()) {
|
||||||
long lastTs = Math.max(readLastTs, writeLastTs);
|
|
||||||
|
|
||||||
if (!reqsLimiter.canExecute(now, lastTs)) {
|
|
||||||
ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) {
|
if (!reqSizeLimiter.canExecute(writeSize + readSize)) {
|
||||||
ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter
|
ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter
|
||||||
.waitInterval(writeSize + readSize));
|
.waitInterval(writeSize + readSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (writeSize > 0) {
|
if (writeSize > 0) {
|
||||||
if (!writeReqsLimiter.canExecute(now, writeLastTs)) {
|
if (!writeReqsLimiter.canExecute()) {
|
||||||
ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) {
|
if (!writeSizeLimiter.canExecute(writeSize)) {
|
||||||
ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
|
ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (readSize > 0) {
|
if (readSize > 0) {
|
||||||
if (!readReqsLimiter.canExecute(now, readLastTs)) {
|
if (!readReqsLimiter.canExecute()) {
|
||||||
ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
|
||||||
}
|
}
|
||||||
if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) {
|
if (!readSizeLimiter.canExecute(readSize)) {
|
||||||
ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
|
ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -124,20 +145,16 @@ public final class TimeBasedLimiter implements QuotaLimiter {
|
||||||
public void grabQuota(long writeSize, long readSize) {
|
public void grabQuota(long writeSize, long readSize) {
|
||||||
assert writeSize != 0 || readSize != 0;
|
assert writeSize != 0 || readSize != 0;
|
||||||
|
|
||||||
long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
|
|
||||||
reqsLimiter.consume(1);
|
reqsLimiter.consume(1);
|
||||||
reqSizeLimiter.consume(writeSize + readSize);
|
reqSizeLimiter.consume(writeSize + readSize);
|
||||||
|
|
||||||
if (writeSize > 0) {
|
if (writeSize > 0) {
|
||||||
writeReqsLimiter.consume(1);
|
writeReqsLimiter.consume(1);
|
||||||
writeSizeLimiter.consume(writeSize);
|
writeSizeLimiter.consume(writeSize);
|
||||||
writeLastTs = now;
|
|
||||||
}
|
}
|
||||||
if (readSize > 0) {
|
if (readSize > 0) {
|
||||||
readReqsLimiter.consume(1);
|
readReqsLimiter.consume(1);
|
||||||
readSizeLimiter.consume(readSize);
|
readSizeLimiter.consume(readSize);
|
||||||
readLastTs = now;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,16 +48,14 @@ public class TestRateLimiter {
|
||||||
|
|
||||||
private void testWaitInterval(final TimeUnit timeUnit, final long limit,
|
private void testWaitInterval(final TimeUnit timeUnit, final long limit,
|
||||||
final long expectedWaitInterval) {
|
final long expectedWaitInterval) {
|
||||||
RateLimiter limiter = new RateLimiter();
|
RateLimiter limiter = new AverageIntervalRateLimiter();
|
||||||
limiter.set(limit, timeUnit);
|
limiter.set(limit, timeUnit);
|
||||||
|
|
||||||
long nowTs = 0;
|
long nowTs = 0;
|
||||||
long lastTs = 0;
|
|
||||||
|
|
||||||
// consume all the available resources, one request at the time.
|
// consume all the available resources, one request at the time.
|
||||||
// the wait interval should be 0
|
// the wait interval should be 0
|
||||||
for (int i = 0; i < (limit - 1); ++i) {
|
for (int i = 0; i < (limit - 1); ++i) {
|
||||||
assertTrue(limiter.canExecute(nowTs, lastTs));
|
assertTrue(limiter.canExecute());
|
||||||
limiter.consume();
|
limiter.consume();
|
||||||
long waitInterval = limiter.waitInterval();
|
long waitInterval = limiter.waitInterval();
|
||||||
assertEquals(0, waitInterval);
|
assertEquals(0, waitInterval);
|
||||||
|
@ -66,40 +64,102 @@ public class TestRateLimiter {
|
||||||
for (int i = 0; i < (limit * 4); ++i) {
|
for (int i = 0; i < (limit * 4); ++i) {
|
||||||
// There is one resource available, so we should be able to
|
// There is one resource available, so we should be able to
|
||||||
// consume it without waiting.
|
// consume it without waiting.
|
||||||
assertTrue(limiter.canExecute(nowTs, lastTs));
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs);
|
||||||
|
assertTrue(limiter.canExecute());
|
||||||
assertEquals(0, limiter.waitInterval());
|
assertEquals(0, limiter.waitInterval());
|
||||||
limiter.consume();
|
limiter.consume();
|
||||||
lastTs = nowTs;
|
|
||||||
|
|
||||||
// No more resources are available, we should wait for at least an interval.
|
// No more resources are available, we should wait for at least an interval.
|
||||||
long waitInterval = limiter.waitInterval();
|
long waitInterval = limiter.waitInterval();
|
||||||
assertEquals(expectedWaitInterval, waitInterval);
|
assertEquals(expectedWaitInterval, waitInterval);
|
||||||
|
|
||||||
// set the nowTs to be the exact time when resources should be available again.
|
// set the nowTs to be the exact time when resources should be available again.
|
||||||
nowTs += waitInterval;
|
nowTs = waitInterval;
|
||||||
|
|
||||||
// artificially go into the past to prove that when too early we should fail.
|
// artificially go into the past to prove that when too early we should fail.
|
||||||
assertFalse(limiter.canExecute(nowTs - 500, lastTs));
|
long temp = nowTs + 500;
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() + temp);
|
||||||
|
assertFalse(limiter.canExecute());
|
||||||
|
//Roll back the nextRefillTime set to continue further testing
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - temp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOverconsumption() {
|
public void testOverconsumptionAverageIntervalRefillStrategy() {
|
||||||
RateLimiter limiter = new RateLimiter();
|
RateLimiter limiter = new AverageIntervalRateLimiter();
|
||||||
limiter.set(10, TimeUnit.SECONDS);
|
limiter.set(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
// 10 resources are available, but we need to consume 20 resources
|
// 10 resources are available, but we need to consume 20 resources
|
||||||
// Verify that we have to wait at least 1.1sec to have 1 resource available
|
// Verify that we have to wait at least 1.1sec to have 1 resource available
|
||||||
assertTrue(limiter.canExecute(0, 0));
|
assertTrue(limiter.canExecute());
|
||||||
limiter.consume(20);
|
limiter.consume(20);
|
||||||
assertEquals(1100, limiter.waitInterval());
|
// To consume 1 resource wait for 100ms
|
||||||
|
assertEquals(100, limiter.waitInterval(1));
|
||||||
|
// To consume 10 resource wait for 1000ms
|
||||||
|
assertEquals(1000, limiter.waitInterval(10));
|
||||||
|
|
||||||
// Verify that after 1sec we need to wait for another 0.1sec to get a resource available
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
|
||||||
assertFalse(limiter.canExecute(1000, 0));
|
// Verify that after 1sec the 1 resource is available
|
||||||
assertEquals(100, limiter.waitInterval());
|
assertTrue(limiter.canExecute(1));
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
|
||||||
// Verify that after 1.1sec the resource is available
|
// Verify that after 1sec the 10 resource is available
|
||||||
assertTrue(limiter.canExecute(1100, 0));
|
assertTrue(limiter.canExecute());
|
||||||
assertEquals(0, limiter.waitInterval());
|
assertEquals(0, limiter.waitInterval());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOverconsumptionFixedIntervalRefillStrategy() throws InterruptedException {
|
||||||
|
RateLimiter limiter = new FixedIntervalRateLimiter();
|
||||||
|
limiter.set(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// 10 resources are available, but we need to consume 20 resources
|
||||||
|
// Verify that we have to wait at least 1.1sec to have 1 resource available
|
||||||
|
assertTrue(limiter.canExecute());
|
||||||
|
limiter.consume(20);
|
||||||
|
// To consume 1 resource also wait for 1000ms
|
||||||
|
assertEquals(1000, limiter.waitInterval(1));
|
||||||
|
// To consume 10 resource wait for 100ms
|
||||||
|
assertEquals(1000, limiter.waitInterval(10));
|
||||||
|
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
|
||||||
|
// Verify that after 1sec also no resource should be available
|
||||||
|
assertFalse(limiter.canExecute(1));
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
|
||||||
|
|
||||||
|
// Verify that after 1sec the 10 resource is available
|
||||||
|
assertTrue(limiter.canExecute());
|
||||||
|
assertEquals(0, limiter.waitInterval());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFixedIntervalResourceAvailability() throws Exception {
|
||||||
|
RateLimiter limiter = new FixedIntervalRateLimiter();
|
||||||
|
limiter.set(10, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
assertTrue(limiter.canExecute(10));
|
||||||
|
limiter.consume(3);
|
||||||
|
assertEquals(7, limiter.getAvailable());
|
||||||
|
assertFalse(limiter.canExecute(10));
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 3);
|
||||||
|
assertTrue(limiter.canExecute(10));
|
||||||
|
assertEquals(10, limiter.getAvailable());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLimiterBySmallerRate() throws InterruptedException {
|
||||||
|
// set limiter is 10 resources per seconds
|
||||||
|
RateLimiter limiter = new FixedIntervalRateLimiter();
|
||||||
|
limiter.set(10, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
int count = 0; // control the test count
|
||||||
|
while ((count++) < 10) {
|
||||||
|
// test will get 3 resources per 0.5 sec. so it will get 6 resources per sec.
|
||||||
|
limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
// 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true
|
||||||
|
assertEquals(true, limiter.canExecute());
|
||||||
|
limiter.consume();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue