diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java new file mode 100644 index 00000000000..a0cd71ba3f9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.annotations.VisibleForTesting; + +/** + * This limiter will refill resources at every TimeUnit/resources interval. For example: For a + * limiter configured with 10resources/second, then 1 resource will be refilled after every 100ms + * (1sec/10resources) + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class AverageIntervalRateLimiter extends RateLimiter { + private long nextRefillTime = -1L; + + @Override + public long refill(long limit, long available) { + final long now = EnvironmentEdgeManager.currentTime(); + if (nextRefillTime == -1) { + // Till now no resource has been consumed. + nextRefillTime = EnvironmentEdgeManager.currentTime(); + return limit; + } + + long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis(); + if (delta > 0) { + this.nextRefillTime = now; + return Math.min(limit, available + delta); + } + return 0; + } + + @Override + public long getWaitInterval(long limit, long available, long amount) { + if (nextRefillTime == -1) { + return 0; + } + long timeUnitInMillis = super.getTimeUnitInMillis(); + return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit); + } + + // This method is for strictly testing purpose only + @VisibleForTesting + public void setNextRefillTime(long nextRefillTime) { + this.nextRefillTime = nextRefillTime; + } + + @VisibleForTesting + public long getNextRefillTime() { + return this.nextRefillTime; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java new file mode 100644 index 00000000000..0b05798c1ba --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import com.google.common.annotations.VisibleForTesting; + +/** + * With this limiter resources will be refilled only after a fixed interval of time. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class FixedIntervalRateLimiter extends RateLimiter { + private long nextRefillTime = -1L; + + @Override + public long refill(long limit, long available) { + final long now = EnvironmentEdgeManager.currentTime(); + if (now < nextRefillTime) { + return 0; + } + nextRefillTime = now + super.getTimeUnitInMillis(); + return limit; + } + + @Override + public long getWaitInterval(long limit, long available, long amount) { + if (nextRefillTime == -1) { + return 0; + } + final long now = EnvironmentEdgeManager.currentTime(); + final long refillTime = nextRefillTime; + return refillTime - now; + } + + // This method is for strictly testing purpose only + @VisibleForTesting + public void setNextRefillTime(long nextRefillTime) { + this.nextRefillTime = nextRefillTime; + } + + @VisibleForTesting + public long getNextRefillTime() { + return this.nextRefillTime; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java index 1806cc3f3b0..e32649d6b49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java @@ -23,19 +23,20 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import com.google.common.annotations.VisibleForTesting; + /** * Simple rate limiter. * * Usage Example: - * RateLimiter limiter = new RateLimiter(); // At this point you have a unlimited resource limiter + * // At this point you have a unlimited resource limiter + * RateLimiter limiter = new AverageIntervalRateLimiter(); + * or new FixedIntervalRateLimiter(); * limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec * - * long lastTs = 0; // You need to keep track of the last update timestamp * while (true) { - * long now = System.currentTimeMillis(); - * * // call canExecute before performing resource consuming operation - * bool canExecute = limiter.canExecute(now, lastTs); + * bool canExecute = limiter.canExecute(); * // If there are no available resources, wait until one is available * if (!canExecute) Thread.sleep(limiter.waitInterval()); * // ...execute the work and consume the resource... @@ -44,13 +45,28 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class RateLimiter { +public abstract class RateLimiter { + public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter"; private long tunit = 1000; // Timeunit factor for translating to ms. private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to. private long avail = Long.MAX_VALUE; // Currently available resource units - public RateLimiter() { - } + /** + * Refill the available units w.r.t the elapsed time. + * @param limit Maximum available resource units that can be refilled to. + * @param available Currently available resource units + */ + abstract long refill(long limit, long available); + + /** + * Time in milliseconds to wait for before requesting to consume 'amount' resource. + * @param limit Maximum available resource units that can be refilled to. + * @param available Currently available resource units + * @param amount Resources for which time interval to calculate for + * @return estimate of the ms required to wait before being able to provide 'amount' resources. + */ + abstract long getWaitInterval(long limit, long available, long amount); + /** * Set the RateLimiter max available resources and refill period. @@ -59,35 +75,34 @@ public class RateLimiter { */ public void set(final long limit, final TimeUnit timeUnit) { switch (timeUnit) { - case NANOSECONDS: - throw new RuntimeException("Unsupported NANOSECONDS TimeUnit"); - case MICROSECONDS: - throw new RuntimeException("Unsupported MICROSECONDS TimeUnit"); - case MILLISECONDS: - tunit = 1; - break; - case SECONDS: - tunit = 1000; - break; - case MINUTES: - tunit = 60 * 1000; - break; - case HOURS: - tunit = 60 * 60 * 1000; - break; - case DAYS: - tunit = 24 * 60 * 60 * 1000; - break; + case MILLISECONDS: + tunit = 1; + break; + case SECONDS: + tunit = 1000; + break; + case MINUTES: + tunit = 60 * 1000; + break; + case HOURS: + tunit = 60 * 60 * 1000; + break; + case DAYS: + tunit = 24 * 60 * 60 * 1000; + break; + default: + throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit."); } this.limit = limit; this.avail = limit; } public String toString() { + String rateLimiter = this.getClass().getSimpleName(); if (limit == Long.MAX_VALUE) { - return "RateLimiter(Bypass)"; + return rateLimiter + "(Bypass)"; } - return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")"; + return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")"; } /** @@ -116,25 +131,38 @@ public class RateLimiter { return avail; } - /** - * given the time interval, is there at least one resource available to allow execution? - * @param now the current timestamp - * @param lastTs the timestamp of the last update - * @return true if there is at least one resource available, otherwise false - */ - public boolean canExecute(final long now, final long lastTs) { - return canExecute(now, lastTs, 1); + protected long getTimeUnitInMillis() { + return tunit; } /** - * given the time interval, are there enough available resources to allow execution? - * @param now the current timestamp - * @param lastTs the timestamp of the last update + * Is there at least one resource available to allow execution? + * @return true if there is at least one resource available, otherwise false + */ + public boolean canExecute() { + return canExecute(1); + } + + /** + * Are there enough available resources to allow execution? * @param amount the number of required resources * @return true if there are enough available resources, otherwise false */ - public synchronized boolean canExecute(final long now, final long lastTs, final long amount) { - return avail >= amount ? true : refill(now, lastTs) >= amount; + public synchronized boolean canExecute(final long amount) { + long refillAmount = refill(limit, avail); + if (refillAmount == 0 && avail < amount) { + return false; + } + // check for positive overflow + if (avail <= Long.MAX_VALUE - refillAmount) { + avail = Math.max(0, Math.min(avail + refillAmount, limit)); + } else { + avail = Math.max(0, limit); + } + if (avail >= amount) { + return true; + } + return false; } /** @@ -150,6 +178,9 @@ public class RateLimiter { */ public synchronized void consume(final long amount) { this.avail -= amount; + if (this.avail < 0) { + this.avail = 0; + } } /** @@ -164,18 +195,16 @@ public class RateLimiter { */ public synchronized long waitInterval(final long amount) { // TODO Handle over quota? - return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit); + return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount); } - /** - * given the specified time interval, refill the avilable units to the proportionate - * to elapsed time or to the prespecified limit. - */ - private long refill(final long now, final long lastTs) { - long delta = (limit * (now - lastTs)) / tunit; - if (delta > 0) { - avail = Math.min(limit, avail + delta); - } - return avail; + // This method is for strictly testing purpose only + @VisibleForTesting + public void setNextRefillTime(long nextRefillTime) { + this.setNextRefillTime(nextRefillTime); + } + + public long getNextRefillTime() { + return this.getNextRefillTime(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 79687a9a209..beb4147643a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.quotas; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -26,7 +28,6 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota; import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize; import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Simple time based limiter that checks the quota Throttle @@ -34,18 +35,33 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @InterfaceAudience.Private @InterfaceStability.Evolving public class TimeBasedLimiter implements QuotaLimiter { - private long writeLastTs = 0; - private long readLastTs = 0; - - private RateLimiter reqsLimiter = new RateLimiter(); - private RateLimiter reqSizeLimiter = new RateLimiter(); - private RateLimiter writeReqsLimiter = new RateLimiter(); - private RateLimiter writeSizeLimiter = new RateLimiter(); - private RateLimiter readReqsLimiter = new RateLimiter(); - private RateLimiter readSizeLimiter = new RateLimiter(); + private static final Configuration conf = HBaseConfiguration.create(); + private RateLimiter reqsLimiter = null; + private RateLimiter reqSizeLimiter = null; + private RateLimiter writeReqsLimiter = null; + private RateLimiter writeSizeLimiter = null; + private RateLimiter readReqsLimiter = null; + private RateLimiter readSizeLimiter = null; private AvgOperationSize avgOpSize = new AvgOperationSize(); private TimeBasedLimiter() { + if (FixedIntervalRateLimiter.class.getName().equals( + conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class) + .getName())) { + reqsLimiter = new FixedIntervalRateLimiter(); + reqSizeLimiter = new FixedIntervalRateLimiter(); + writeReqsLimiter = new FixedIntervalRateLimiter(); + writeSizeLimiter = new FixedIntervalRateLimiter(); + readReqsLimiter = new FixedIntervalRateLimiter(); + readSizeLimiter = new FixedIntervalRateLimiter(); + } else { + reqsLimiter = new AverageIntervalRateLimiter(); + reqSizeLimiter = new AverageIntervalRateLimiter(); + writeReqsLimiter = new AverageIntervalRateLimiter(); + writeSizeLimiter = new AverageIntervalRateLimiter(); + readReqsLimiter = new AverageIntervalRateLimiter(); + readSizeLimiter = new AverageIntervalRateLimiter(); + } } static QuotaLimiter fromThrottle(final Throttle throttle) { @@ -97,33 +113,29 @@ public class TimeBasedLimiter implements QuotaLimiter { } @Override - public void checkQuota(long writeSize, long readSize) - throws ThrottlingException { - long now = EnvironmentEdgeManager.currentTime(); - long lastTs = Math.max(readLastTs, writeLastTs); - - if (!reqsLimiter.canExecute(now, lastTs)) { + public void checkQuota(long writeSize, long readSize) throws ThrottlingException { + if (!reqsLimiter.canExecute()) { ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval()); } - if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) { + if (!reqSizeLimiter.canExecute(writeSize + readSize)) { ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter .waitInterval(writeSize + readSize)); } if (writeSize > 0) { - if (!writeReqsLimiter.canExecute(now, writeLastTs)) { + if (!writeReqsLimiter.canExecute()) { ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval()); } - if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) { + if (!writeSizeLimiter.canExecute(writeSize)) { ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize)); } } if (readSize > 0) { - if (!readReqsLimiter.canExecute(now, readLastTs)) { + if (!readReqsLimiter.canExecute()) { ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval()); } - if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) { + if (!readSizeLimiter.canExecute(readSize)) { ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize)); } } @@ -133,20 +145,16 @@ public class TimeBasedLimiter implements QuotaLimiter { public void grabQuota(long writeSize, long readSize) { assert writeSize != 0 || readSize != 0; - long now = EnvironmentEdgeManager.currentTime(); - reqsLimiter.consume(1); reqSizeLimiter.consume(writeSize + readSize); if (writeSize > 0) { writeReqsLimiter.consume(1); writeSizeLimiter.consume(writeSize); - writeLastTs = now; } if (readSize > 0) { readReqsLimiter.consume(1); readSizeLimiter.consume(readSize); - readLastTs = now; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 50897a28c1f..985add2a844 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hbase.quotas; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; - -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + /** * Verify the behaviour of the Rate Limiter. */ @@ -58,16 +56,14 @@ public class TestRateLimiter { private void testWaitInterval(final TimeUnit timeUnit, final long limit, final long expectedWaitInterval) { - RateLimiter limiter = new RateLimiter(); + RateLimiter limiter = new AverageIntervalRateLimiter(); limiter.set(limit, timeUnit); long nowTs = 0; - long lastTs = 0; - // consume all the available resources, one request at the time. // the wait interval should be 0 for (int i = 0; i < (limit - 1); ++i) { - assertTrue(limiter.canExecute(nowTs, lastTs)); + assertTrue(limiter.canExecute()); limiter.consume(); long waitInterval = limiter.waitInterval(); assertEquals(0, waitInterval); @@ -76,40 +72,102 @@ public class TestRateLimiter { for (int i = 0; i < (limit * 4); ++i) { // There is one resource available, so we should be able to // consume it without waiting. - assertTrue(limiter.canExecute(nowTs, lastTs)); + limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs); + assertTrue(limiter.canExecute()); assertEquals(0, limiter.waitInterval()); limiter.consume(); - lastTs = nowTs; - // No more resources are available, we should wait for at least an interval. long waitInterval = limiter.waitInterval(); assertEquals(expectedWaitInterval, waitInterval); // set the nowTs to be the exact time when resources should be available again. - nowTs += waitInterval; + nowTs = waitInterval; // artificially go into the past to prove that when too early we should fail. - assertFalse(limiter.canExecute(nowTs - 500, lastTs)); + long temp = nowTs + 500; + limiter.setNextRefillTime(limiter.getNextRefillTime() + temp); + assertFalse(limiter.canExecute()); + //Roll back the nextRefillTime set to continue further testing + limiter.setNextRefillTime(limiter.getNextRefillTime() - temp); } } @Test - public void testOverconsumption() { - RateLimiter limiter = new RateLimiter(); + public void testOverconsumptionAverageIntervalRefillStrategy() { + RateLimiter limiter = new AverageIntervalRateLimiter(); limiter.set(10, TimeUnit.SECONDS); // 10 resources are available, but we need to consume 20 resources // Verify that we have to wait at least 1.1sec to have 1 resource available - assertTrue(limiter.canExecute(0, 0)); + assertTrue(limiter.canExecute()); limiter.consume(20); - assertEquals(1100, limiter.waitInterval()); + // To consume 1 resource wait for 100ms + assertEquals(100, limiter.waitInterval(1)); + // To consume 10 resource wait for 1000ms + assertEquals(1000, limiter.waitInterval(10)); - // Verify that after 1sec we need to wait for another 0.1sec to get a resource available - assertFalse(limiter.canExecute(1000, 0)); - assertEquals(100, limiter.waitInterval()); - - // Verify that after 1.1sec the resource is available - assertTrue(limiter.canExecute(1100, 0)); + limiter.setNextRefillTime(limiter.getNextRefillTime() - 900); + // Verify that after 1sec the 1 resource is available + assertTrue(limiter.canExecute(1)); + limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); + // Verify that after 1sec the 10 resource is available + assertTrue(limiter.canExecute()); assertEquals(0, limiter.waitInterval()); } + + @Test + public void testOverconsumptionFixedIntervalRefillStrategy() throws InterruptedException { + RateLimiter limiter = new FixedIntervalRateLimiter(); + limiter.set(10, TimeUnit.SECONDS); + + // 10 resources are available, but we need to consume 20 resources + // Verify that we have to wait at least 1.1sec to have 1 resource available + assertTrue(limiter.canExecute()); + limiter.consume(20); + // To consume 1 resource also wait for 1000ms + assertEquals(1000, limiter.waitInterval(1)); + // To consume 10 resource wait for 100ms + assertEquals(1000, limiter.waitInterval(10)); + + limiter.setNextRefillTime(limiter.getNextRefillTime() - 900); + // Verify that after 1sec also no resource should be available + assertFalse(limiter.canExecute(1)); + limiter.setNextRefillTime(limiter.getNextRefillTime() - 100); + + // Verify that after 1sec the 10 resource is available + assertTrue(limiter.canExecute()); + assertEquals(0, limiter.waitInterval()); + } + + @Test + public void testFixedIntervalResourceAvailability() throws Exception { + RateLimiter limiter = new FixedIntervalRateLimiter(); + limiter.set(10, TimeUnit.MILLISECONDS); + + assertTrue(limiter.canExecute(10)); + limiter.consume(3); + assertEquals(7, limiter.getAvailable()); + assertFalse(limiter.canExecute(10)); + limiter.setNextRefillTime(limiter.getNextRefillTime() - 3); + assertTrue(limiter.canExecute(10)); + assertEquals(10, limiter.getAvailable()); + } + + @Test + public void testLimiterBySmallerRate() throws InterruptedException { + // set limiter is 10 resources per seconds + RateLimiter limiter = new FixedIntervalRateLimiter(); + limiter.set(10, TimeUnit.SECONDS); + + int count = 0; // control the test count + while ((count++) < 10) { + // test will get 3 resources per 0.5 sec. so it will get 6 resources per sec. + limiter.setNextRefillTime(limiter.getNextRefillTime() - 500); + for (int i = 0; i < 3; i++) { + // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true + assertEquals(true, limiter.canExecute()); + limiter.consume(); + } + } + } } \ No newline at end of file