diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 2f47ada3055..aff6b8bccad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -498,6 +498,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.metrics.logger.period.seconds"; public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = 600; + /** + * The maximum number of getBlocks RPCs data movement utilities can make to + * a NameNode per second. Values <= 0 disable throttling. This affects + * anything that uses a NameNodeConnector, i.e., the Balancer, Mover, + * and StoragePolicySatisfier. + */ + public static final String DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps"; + public static final int DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 444984b6c9a..d5747503c2a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -821,10 +821,8 @@ public class Dispatcher { * move tasks or it has received enough blocks from the namenode, or the * elapsed time of the iteration has exceeded the max time limit. * - * @param delay - time to sleep before sending getBlocks. Intended to - * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384. */ - private void dispatchBlocks(long delay) { + private void dispatchBlocks() { this.blocksToReceive = 2 * getScheduledSize(); long previousMoveTimestamp = Time.monotonicNow(); while (getScheduledSize() > 0 && !isIterationOver() @@ -849,25 +847,15 @@ public class Dispatcher { if (shouldFetchMoreBlocks()) { // fetch new blocks try { - if(delay > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping " + delay + " msec."); - } - Thread.sleep(delay); - } final long received = getBlockList(); if (received == 0) { return; } blocksToReceive -= received; continue; - } catch (InterruptedException ignored) { - // nothing to do } catch (IOException e) { LOG.warn("Exception while getting block list", e); return; - } finally { - delay = 0L; } } else { // jump out of while-loop after the configured timeout. @@ -1059,12 +1047,6 @@ public class Dispatcher { return nnc.shouldContinue(dispatchBlockMoves()); } - /** - * The best-effort limit on the number of RPCs per second - * the Balancer will send to the NameNode. - */ - final static int BALANCER_NUM_RPC_PER_SEC = 20; - /** * Dispatch block moves for each source. The thread selects blocks to move & * sends request to proxy source to initiate block move. The process is flow @@ -1080,12 +1062,7 @@ public class Dispatcher { int concurrentThreads = Math.min(sources.size(), ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); assert concurrentThreads > 0 : "Number of concurrent threads is 0."; - if (LOG.isDebugEnabled()) { - LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC); - LOG.debug("Balancer concurrent threads = " + concurrentThreads); - LOG.debug("Disperse Interval sec = " + - concurrentThreads / BALANCER_NUM_RPC_PER_SEC); - } + LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads); // Determine the size of each mover thread pool per target int threadsPerTarget = maxMoverThreads/targets.size(); @@ -1105,23 +1082,15 @@ public class Dispatcher { LOG.info("Allocating " + threadsPerTarget + " threads per target."); } - long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { final Source s = i.next(); - final long delay = dSec * 1000; futures[j] = dispatchExecutor.submit(new Runnable() { @Override public void run() { - s.dispatchBlocks(delay); + s.dispatchBlocks(); } }); - // Calculate delay in seconds for the next iteration - if(j >= concurrentThreads) { - dSec = 0; - } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) { - dSec++; - } } // wait for all dispatcher threads to finish diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index a8f07031afb..589b53abe92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; @@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.util.RateLimiter; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; @@ -114,6 +116,7 @@ public class NameNodeConnector implements Closeable { private final int maxNotChangedIterations; private int notChangedIterations = 0; + private final RateLimiter getBlocksRateLimiter; public NameNodeConnector(String name, URI nameNodeUri, Path idPath, List targetPaths, Configuration conf, @@ -124,6 +127,16 @@ public class NameNodeConnector implements Closeable { this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays .asList(new Path("/")) : targetPaths; this.maxNotChangedIterations = maxNotChangedIterations; + int getBlocksMaxQps = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY, + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); + if (getBlocksMaxQps > 0) { + LOG.info("getBlocks calls for " + nameNodeUri + + " will be rate-limited to " + getBlocksMaxQps + " per second"); + this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps); + } else { + this.getBlocksRateLimiter = null; + } this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); @@ -159,6 +172,9 @@ public class NameNodeConnector implements Closeable { /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) throws IOException { + if (getBlocksRateLimiter != null) { + getBlocksRateLimiter.acquire(); + } return namenode.getBlocks(datanode, size); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java new file mode 100644 index 00000000000..c7c5da5cfb8 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/RateLimiter.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.util.Timer; + +/** + * A rate limiter which loosely emulates the behavior of Guava's RateLimiter + * for branches which do not have that class available. The APIs are intended + * to match RateLimiter's to avoid code changes in calling classes when + * switching to Guava. + */ +public class RateLimiter { + + private final Timer timer; + /** The period that should elapse between any two operations. */ + private final long opDispersalPeriodNanos; + + /** The last time an operation completed, in system nanos (not wall time). */ + private final AtomicLong lastOpTimeNanos; + + public static RateLimiter create(double maxOpsPerSecond) { + return new RateLimiter(new Timer(), maxOpsPerSecond); + } + + RateLimiter(Timer timer, double maxOpsPerSecond) { + this.timer = timer; + if (maxOpsPerSecond <= 0) { + throw new IllegalArgumentException("RateLimiter max operations per " + + "second must be > 0 but was " + maxOpsPerSecond); + } + opDispersalPeriodNanos = + (long) (TimeUnit.SECONDS.toNanos(1) / maxOpsPerSecond); + lastOpTimeNanos = new AtomicLong(Long.MIN_VALUE); + } + + /** + * Attempt to acquire a permit to perform an operation. This will block until + * enough time has elapsed since the last operation to perform another. No + * fairness is provided; acquisition attempts will be serviced in an arbitrary + * order rather than FIFO. + * + * @return The time, in seconds, it took to acquire a permit. + */ + public double acquire() { + boolean interrupted = false; + long startTimeNanos = Long.MAX_VALUE; + try { + while (true) { + long currTimeNanos = timer.monotonicNowNanos(); + startTimeNanos = Math.min(currTimeNanos, startTimeNanos); + long lastOpTimeLocal = lastOpTimeNanos.get(); + long nextAllowedOpTime = lastOpTimeLocal + opDispersalPeriodNanos; + if (currTimeNanos >= nextAllowedOpTime) { + // enough time has elapsed; attempt to acquire the current permit + boolean acquired = + lastOpTimeNanos.compareAndSet(lastOpTimeLocal, currTimeNanos); + // if the CAS failed, another thread acquired the permit, try again + if (acquired) { + return (currTimeNanos - startTimeNanos) + / ((double) TimeUnit.SECONDS.toNanos(1)); + } + } else { + interrupted |= sleep(nextAllowedOpTime - currTimeNanos); + } + } + } finally { + if (interrupted) { + // allow other levels to be aware of the interrupt + Thread.currentThread().interrupt(); + } + } + } + + /** Sleep for some amount of nanoseconds. Returns true iff interrupted. */ + boolean sleep(long sleepTimeNanos) { + long sleepTimeMillis = TimeUnit.NANOSECONDS.toMillis(sleepTimeNanos); + try { + Thread.sleep(sleepTimeMillis, (int) (sleepTimeNanos + - TimeUnit.MILLISECONDS.toNanos(sleepTimeMillis))); + } catch (InterruptedException ie) { + // swallow and continue, but allow the interrupt to be remembered + return true; + } + return false; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index d5f61e82149..9c3ef5aa34b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3369,7 +3369,16 @@ HTTPS port for DataNode. - + + dfs.namenode.get-blocks.max-qps + 20 + + The maximum number of getBlocks RPCs data movement utilities can make to + a NameNode per second. Values less than or equal to 0 disable throttling. + This affects anything that uses a NameNodeConnector, i.e., the Balancer, + Mover, and StoragePolicySatisfier. + + dfs.balancer.dispatcherThreads 200 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 29791113ce0..c8456b127b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -70,6 +70,8 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -124,6 +126,7 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.log4j.Level; import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -153,6 +156,16 @@ public class TestBalancer { private static MiniKdc kdc; private static File keytabFile; private MiniDFSCluster cluster; + private AtomicInteger numGetBlocksCalls; + private AtomicLong startGetBlocksTime; + private AtomicLong endGetBlocksTime; + + @Before + public void setup() { + numGetBlocksCalls = new AtomicInteger(0); + startGetBlocksTime = new AtomicLong(Long.MAX_VALUE); + endGetBlocksTime = new AtomicLong(Long.MIN_VALUE); + } static void initSecureConf(Configuration conf) throws Exception { baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"), @@ -765,7 +778,7 @@ public class TestBalancer { long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { doTest(conf, capacities, racks, newCapacity, newRack, nodes, - useTool, useFile, false); + useTool, useFile, false, 0.3); } /** This test start a cluster with specified number of nodes, @@ -784,12 +797,14 @@ public class TestBalancer { * @param useFile - if true, the hosts to included or excluded will be stored in a * file and then later read from the file. * @param useNamesystemSpy - spy on FSNamesystem if true + * @param clusterUtilization - The utilization of the cluster to start, from + * 0.0 to 1.0 * @throws Exception */ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile, - boolean useNamesystemSpy) throws Exception { + boolean useNamesystemSpy, double clusterUtilization) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -819,8 +834,8 @@ public class TestBalancer { long totalCapacity = sum(capacities); - // fill up the cluster to be 30% full - long totalUsedSpace = totalCapacity*3/10; + // fill up the cluster to be `clusterUtilization` full + long totalUsedSpace = (long) (totalCapacity * clusterUtilization); createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); @@ -1938,33 +1953,48 @@ public class TestBalancer { } } - private static int numGetBlocksCalls; - private static long startGetBlocksTime, endGetBlocksTime; - private void spyFSNamesystem(NameNode nn) throws IOException { FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn); - numGetBlocksCalls = 0; - endGetBlocksTime = startGetBlocksTime = Time.monotonicNow(); doAnswer(new Answer() { @Override public BlocksWithLocations answer(InvocationOnMock invocation) throws Throwable { + long startTime = Time.monotonicNow(); + setAtomicLongToMinMax(startGetBlocksTime, startTime, false); BlocksWithLocations blk = (BlocksWithLocations)invocation.callRealMethod(); - endGetBlocksTime = Time.monotonicNow(); - numGetBlocksCalls++; + long endTime = Time.monotonicNow(); + setAtomicLongToMinMax(endGetBlocksTime, endTime, true); + numGetBlocksCalls.incrementAndGet(); return blk; }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong()); } + private static void setAtomicLongToMinMax(AtomicLong value, long newVal, + boolean useMax) { + while (true) { + long currVal = value.get(); + if ((useMax && newVal > currVal) || (!useMax && newVal < currVal)) { + if (value.compareAndSet(currVal, newVal)) { + return; + } + } else { + return; + } + } + } + /** * Test that makes the Balancer to disperse RPCs to the NameNode - * in order to avoid NN's RPC queue saturation. + * in order to avoid NN's RPC queue saturation. This not marked as @Test + * because it is run from {@link TestBalancerRPCDelay}. */ - void testBalancerRPCDelay() throws Exception { + void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY, + getBlocksMaxQps); int numDNs = 20; long[] capacities = new long[numDNs]; @@ -1974,16 +2004,22 @@ public class TestBalancer { racks[i] = (i < numDNs/2 ? RACK0 : RACK1); } doTest(conf, capacities, racks, CAPACITY, RACK2, - new PortNumberBasedNodes(3, 0, 0), false, false, true); + // Use only 1 node and set the starting capacity to 50% to allow the + // balancing to complete in only one iteration. This is necessary + // because the startGetBlocksTime and endGetBlocksTime measures across + // all get block calls, so if two iterations are performed, the duration + // also includes the time it took to perform the block move ops in the + // first iteration + new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5); assertTrue("Number of getBlocks should be not less than " + - Dispatcher.BALANCER_NUM_RPC_PER_SEC, - numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC); - long d = 1 + endGetBlocksTime - startGetBlocksTime; - LOG.info("Balancer executed " + numGetBlocksCalls - + " getBlocks in " + d + " msec."); - assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " + - Dispatcher.BALANCER_NUM_RPC_PER_SEC, - (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC); + getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps); + long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get(); + int durationSec = (int) Math.ceil(durationMs / 1000.0); + LOG.info("Balancer executed " + numGetBlocksCalls.get() + " getBlocks in " + + durationMs + " msec (round up to " + durationSec + " sec)"); + long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec; + assertTrue("Expected balancer getBlocks calls per second <= " + + getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java index 960ad257663..79c7f87d4e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.junit.After; +import org.junit.Before; import org.junit.Test; /** @@ -25,8 +28,29 @@ import org.junit.Test; */ public class TestBalancerRPCDelay { + private TestBalancer testBalancer; + + @Before + public void setup() { + testBalancer = new TestBalancer(); + testBalancer.setup(); + } + + @After + public void teardown() throws Exception { + if (testBalancer != null) { + testBalancer.shutdown(); + } + } + @Test(timeout=100000) - public void testBalancerRPCDelay() throws Exception { - new TestBalancer().testBalancerRPCDelay(); + public void testBalancerRPCDelayQps3() throws Exception { + testBalancer.testBalancerRPCDelay(3); + } + + @Test(timeout=100000) + public void testBalancerRPCDelayQpsDefault() throws Exception { + testBalancer.testBalancerRPCDelay( + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java new file mode 100644 index 00000000000..e1c6af63f6d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestRateLimiter.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** Tests for {@link RateLimiter}. */ +public class TestRateLimiter { + + // epsilon of 1 ns + private static final double EPSILON = 1.0 / TimeUnit.SECONDS.toNanos(1); + + @Test + public void testRateLimiter() { + FakeTimer timer = new FakeTimer(); + Queue sleepTimesNanos = new LinkedList<>(); + Queue advanceTimerNanos = new LinkedList<>(); + RateLimiter limiter = + new TestingRateLimiter(timer, 10, sleepTimesNanos, advanceTimerNanos); + + final long nanos100ms = TimeUnit.MILLISECONDS.toNanos(100); + + // should be able to acquire immediately the first time + assertEquals(0.0, limiter.acquire(), EPSILON); + assertTrue(sleepTimesNanos.isEmpty()); + + // 100ms of sleep is required the second time + advanceTimerNanos.add(nanos100ms); + assertEquals(0.1, limiter.acquire(), EPSILON); + assertEquals(1, sleepTimesNanos.size()); + assertNextValue(sleepTimesNanos, nanos100ms); + + // test when it takes 2 sleep cycles to be able to acquire + advanceTimerNanos.add(nanos100ms / 2); + advanceTimerNanos.add(nanos100ms / 2); + assertEquals(0.1, limiter.acquire(), EPSILON); + assertEquals(2, sleepTimesNanos.size()); + assertNextValue(sleepTimesNanos, nanos100ms); + assertNextValue(sleepTimesNanos, nanos100ms / 2); + + // if some time passes between acquisitions, the next should be immediate + timer.advanceNanos(nanos100ms * 2); + assertEquals(0.0, limiter.acquire(), EPSILON); + assertTrue(sleepTimesNanos.isEmpty()); + + // the rate limiter has no memory, so although time passed, the next + // acquisition is still rate limited + advanceTimerNanos.add(nanos100ms); + assertEquals(0.1, limiter.acquire(), EPSILON); + assertEquals(1, sleepTimesNanos.size()); + assertNextValue(sleepTimesNanos, nanos100ms); + } + + private static void assertNextValue(Queue queue, long expected) { + Long value = queue.poll(); + assertNotNull(value); + assertEquals(expected, value.longValue()); + } + + private static class TestingRateLimiter extends RateLimiter { + + private final FakeTimer fakeTimer; + private final Queue sleepTimesNanos; + private final Queue advanceTimerNanos; + + TestingRateLimiter(FakeTimer fakeTimer, double maxOpsPerSecond, + Queue sleepTimesNanos, Queue advanceTimerNanos) { + super(fakeTimer, maxOpsPerSecond); + this.fakeTimer = fakeTimer; + this.sleepTimesNanos = sleepTimesNanos; + this.advanceTimerNanos = advanceTimerNanos; + } + + @Override + boolean sleep(long sleepTimeNanos) { + sleepTimesNanos.offer(sleepTimeNanos); + Long advanceNanos = advanceTimerNanos.poll(); + if (advanceNanos == null) { + fail("Unexpected sleep; no timer advance value found"); + } + fakeTimer.advanceNanos(advanceNanos); + return false; + } + } +}