HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.
(cherry picked fromb2cc8b6b4a
) (cherry picked from60655bfe54
) (cherry picked from17779adb32
)
This commit is contained in:
parent
3a7121b0b6
commit
403892264b
|
@ -498,6 +498,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
"dfs.datanode.metrics.logger.period.seconds";
|
||||
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
|
||||
600;
|
||||
/**
|
||||
* The maximum number of getBlocks RPCs data movement utilities can make to
|
||||
* a NameNode per second. Values <= 0 disable throttling. This affects
|
||||
* anything that uses a NameNodeConnector, i.e., the Balancer, Mover,
|
||||
* and StoragePolicySatisfier.
|
||||
*/
|
||||
public static final String DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps";
|
||||
public static final int DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20;
|
||||
|
||||
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
|
||||
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
|
||||
|
|
|
@ -821,10 +821,8 @@ public class Dispatcher {
|
|||
* move tasks or it has received enough blocks from the namenode, or the
|
||||
* elapsed time of the iteration has exceeded the max time limit.
|
||||
*
|
||||
* @param delay - time to sleep before sending getBlocks. Intended to
|
||||
* disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
|
||||
*/
|
||||
private void dispatchBlocks(long delay) {
|
||||
private void dispatchBlocks() {
|
||||
this.blocksToReceive = 2 * getScheduledSize();
|
||||
long previousMoveTimestamp = Time.monotonicNow();
|
||||
while (getScheduledSize() > 0 && !isIterationOver()
|
||||
|
@ -849,25 +847,15 @@ public class Dispatcher {
|
|||
if (shouldFetchMoreBlocks()) {
|
||||
// fetch new blocks
|
||||
try {
|
||||
if(delay > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping " + delay + " msec.");
|
||||
}
|
||||
Thread.sleep(delay);
|
||||
}
|
||||
final long received = getBlockList();
|
||||
if (received == 0) {
|
||||
return;
|
||||
}
|
||||
blocksToReceive -= received;
|
||||
continue;
|
||||
} catch (InterruptedException ignored) {
|
||||
// nothing to do
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Exception while getting block list", e);
|
||||
return;
|
||||
} finally {
|
||||
delay = 0L;
|
||||
}
|
||||
} else {
|
||||
// jump out of while-loop after the configured timeout.
|
||||
|
@ -1059,12 +1047,6 @@ public class Dispatcher {
|
|||
return nnc.shouldContinue(dispatchBlockMoves());
|
||||
}
|
||||
|
||||
/**
|
||||
* The best-effort limit on the number of RPCs per second
|
||||
* the Balancer will send to the NameNode.
|
||||
*/
|
||||
final static int BALANCER_NUM_RPC_PER_SEC = 20;
|
||||
|
||||
/**
|
||||
* Dispatch block moves for each source. The thread selects blocks to move &
|
||||
* sends request to proxy source to initiate block move. The process is flow
|
||||
|
@ -1080,12 +1062,7 @@ public class Dispatcher {
|
|||
int concurrentThreads = Math.min(sources.size(),
|
||||
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
|
||||
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
|
||||
LOG.debug("Balancer concurrent threads = " + concurrentThreads);
|
||||
LOG.debug("Disperse Interval sec = " +
|
||||
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
|
||||
}
|
||||
LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
|
||||
|
||||
// Determine the size of each mover thread pool per target
|
||||
int threadsPerTarget = maxMoverThreads/targets.size();
|
||||
|
@ -1105,23 +1082,15 @@ public class Dispatcher {
|
|||
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
|
||||
}
|
||||
|
||||
long dSec = 0;
|
||||
final Iterator<Source> i = sources.iterator();
|
||||
for (int j = 0; j < futures.length; j++) {
|
||||
final Source s = i.next();
|
||||
final long delay = dSec * 1000;
|
||||
futures[j] = dispatchExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
s.dispatchBlocks(delay);
|
||||
s.dispatchBlocks();
|
||||
}
|
||||
});
|
||||
// Calculate delay in seconds for the next iteration
|
||||
if(j >= concurrentThreads) {
|
||||
dSec = 0;
|
||||
} else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
|
||||
dSec++;
|
||||
}
|
||||
}
|
||||
|
||||
// wait for all dispatcher threads to finish
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||
|
@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
|
|||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.RateLimiter;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
|
@ -114,6 +116,7 @@ public class NameNodeConnector implements Closeable {
|
|||
|
||||
private final int maxNotChangedIterations;
|
||||
private int notChangedIterations = 0;
|
||||
private final RateLimiter getBlocksRateLimiter;
|
||||
|
||||
public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
|
||||
List<Path> targetPaths, Configuration conf,
|
||||
|
@ -124,6 +127,16 @@ public class NameNodeConnector implements Closeable {
|
|||
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
|
||||
.asList(new Path("/")) : targetPaths;
|
||||
this.maxNotChangedIterations = maxNotChangedIterations;
|
||||
int getBlocksMaxQps = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
|
||||
if (getBlocksMaxQps > 0) {
|
||||
LOG.info("getBlocks calls for " + nameNodeUri
|
||||
+ " will be rate-limited to " + getBlocksMaxQps + " per second");
|
||||
this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps);
|
||||
} else {
|
||||
this.getBlocksRateLimiter = null;
|
||||
}
|
||||
|
||||
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
|
||||
BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
|
||||
|
@ -159,6 +172,9 @@ public class NameNodeConnector implements Closeable {
|
|||
/** @return blocks with locations. */
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||
throws IOException {
|
||||
if (getBlocksRateLimiter != null) {
|
||||
getBlocksRateLimiter.acquire();
|
||||
}
|
||||
return namenode.getBlocks(datanode, size);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import org.apache.hadoop.util.Timer;
|
||||
|
||||
/**
|
||||
* A rate limiter which loosely emulates the behavior of Guava's RateLimiter
|
||||
* for branches which do not have that class available. The APIs are intended
|
||||
* to match RateLimiter's to avoid code changes in calling classes when
|
||||
* switching to Guava.
|
||||
*/
|
||||
public class RateLimiter {
|
||||
|
||||
private final Timer timer;
|
||||
/** The period that should elapse between any two operations. */
|
||||
private final long opDispersalPeriodNanos;
|
||||
|
||||
/** The last time an operation completed, in system nanos (not wall time). */
|
||||
private final AtomicLong lastOpTimeNanos;
|
||||
|
||||
public static RateLimiter create(double maxOpsPerSecond) {
|
||||
return new RateLimiter(new Timer(), maxOpsPerSecond);
|
||||
}
|
||||
|
||||
RateLimiter(Timer timer, double maxOpsPerSecond) {
|
||||
this.timer = timer;
|
||||
if (maxOpsPerSecond <= 0) {
|
||||
throw new IllegalArgumentException("RateLimiter max operations per "
|
||||
+ "second must be > 0 but was " + maxOpsPerSecond);
|
||||
}
|
||||
opDispersalPeriodNanos =
|
||||
(long) (TimeUnit.SECONDS.toNanos(1) / maxOpsPerSecond);
|
||||
lastOpTimeNanos = new AtomicLong(Long.MIN_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to acquire a permit to perform an operation. This will block until
|
||||
* enough time has elapsed since the last operation to perform another. No
|
||||
* fairness is provided; acquisition attempts will be serviced in an arbitrary
|
||||
* order rather than FIFO.
|
||||
*
|
||||
* @return The time, in seconds, it took to acquire a permit.
|
||||
*/
|
||||
public double acquire() {
|
||||
boolean interrupted = false;
|
||||
long startTimeNanos = Long.MAX_VALUE;
|
||||
try {
|
||||
while (true) {
|
||||
long currTimeNanos = timer.monotonicNowNanos();
|
||||
startTimeNanos = Math.min(currTimeNanos, startTimeNanos);
|
||||
long lastOpTimeLocal = lastOpTimeNanos.get();
|
||||
long nextAllowedOpTime = lastOpTimeLocal + opDispersalPeriodNanos;
|
||||
if (currTimeNanos >= nextAllowedOpTime) {
|
||||
// enough time has elapsed; attempt to acquire the current permit
|
||||
boolean acquired =
|
||||
lastOpTimeNanos.compareAndSet(lastOpTimeLocal, currTimeNanos);
|
||||
// if the CAS failed, another thread acquired the permit, try again
|
||||
if (acquired) {
|
||||
return (currTimeNanos - startTimeNanos)
|
||||
/ ((double) TimeUnit.SECONDS.toNanos(1));
|
||||
}
|
||||
} else {
|
||||
interrupted |= sleep(nextAllowedOpTime - currTimeNanos);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (interrupted) {
|
||||
// allow other levels to be aware of the interrupt
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Sleep for some amount of nanoseconds. Returns true iff interrupted. */
|
||||
boolean sleep(long sleepTimeNanos) {
|
||||
long sleepTimeMillis = TimeUnit.NANOSECONDS.toMillis(sleepTimeNanos);
|
||||
try {
|
||||
Thread.sleep(sleepTimeMillis, (int) (sleepTimeNanos
|
||||
- TimeUnit.MILLISECONDS.toNanos(sleepTimeMillis)));
|
||||
} catch (InterruptedException ie) {
|
||||
// swallow and continue, but allow the interrupt to be remembered
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -3369,7 +3369,16 @@
|
|||
HTTPS port for DataNode.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.get-blocks.max-qps</name>
|
||||
<value>20</value>
|
||||
<description>
|
||||
The maximum number of getBlocks RPCs data movement utilities can make to
|
||||
a NameNode per second. Values less than or equal to 0 disable throttling.
|
||||
This affects anything that uses a NameNodeConnector, i.e., the Balancer,
|
||||
Mover, and StoragePolicySatisfier.
|
||||
</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>dfs.balancer.dispatcherThreads</name>
|
||||
<value>200</value>
|
||||
|
|
|
@ -70,6 +70,8 @@ import java.util.Properties;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -124,6 +126,7 @@ import org.apache.hadoop.util.Time;
|
|||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
@ -153,6 +156,16 @@ public class TestBalancer {
|
|||
private static MiniKdc kdc;
|
||||
private static File keytabFile;
|
||||
private MiniDFSCluster cluster;
|
||||
private AtomicInteger numGetBlocksCalls;
|
||||
private AtomicLong startGetBlocksTime;
|
||||
private AtomicLong endGetBlocksTime;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
numGetBlocksCalls = new AtomicInteger(0);
|
||||
startGetBlocksTime = new AtomicLong(Long.MAX_VALUE);
|
||||
endGetBlocksTime = new AtomicLong(Long.MIN_VALUE);
|
||||
}
|
||||
|
||||
static void initSecureConf(Configuration conf) throws Exception {
|
||||
baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),
|
||||
|
@ -765,7 +778,7 @@ public class TestBalancer {
|
|||
long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile) throws Exception {
|
||||
doTest(conf, capacities, racks, newCapacity, newRack, nodes,
|
||||
useTool, useFile, false);
|
||||
useTool, useFile, false, 0.3);
|
||||
}
|
||||
|
||||
/** This test start a cluster with specified number of nodes,
|
||||
|
@ -784,12 +797,14 @@ public class TestBalancer {
|
|||
* @param useFile - if true, the hosts to included or excluded will be stored in a
|
||||
* file and then later read from the file.
|
||||
* @param useNamesystemSpy - spy on FSNamesystem if true
|
||||
* @param clusterUtilization - The utilization of the cluster to start, from
|
||||
* 0.0 to 1.0
|
||||
* @throws Exception
|
||||
*/
|
||||
private void doTest(Configuration conf, long[] capacities,
|
||||
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile,
|
||||
boolean useNamesystemSpy) throws Exception {
|
||||
boolean useNamesystemSpy, double clusterUtilization) throws Exception {
|
||||
LOG.info("capacities = " + long2String(capacities));
|
||||
LOG.info("racks = " + Arrays.asList(racks));
|
||||
LOG.info("newCapacity= " + newCapacity);
|
||||
|
@ -819,8 +834,8 @@ public class TestBalancer {
|
|||
|
||||
long totalCapacity = sum(capacities);
|
||||
|
||||
// fill up the cluster to be 30% full
|
||||
long totalUsedSpace = totalCapacity*3/10;
|
||||
// fill up the cluster to be `clusterUtilization` full
|
||||
long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
|
||||
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
||||
(short) numOfDatanodes, 0);
|
||||
|
||||
|
@ -1938,33 +1953,48 @@ public class TestBalancer {
|
|||
}
|
||||
}
|
||||
|
||||
private static int numGetBlocksCalls;
|
||||
private static long startGetBlocksTime, endGetBlocksTime;
|
||||
|
||||
private void spyFSNamesystem(NameNode nn) throws IOException {
|
||||
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
|
||||
numGetBlocksCalls = 0;
|
||||
endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
|
||||
doAnswer(new Answer<BlocksWithLocations>() {
|
||||
@Override
|
||||
public BlocksWithLocations answer(InvocationOnMock invocation)
|
||||
throws Throwable {
|
||||
long startTime = Time.monotonicNow();
|
||||
setAtomicLongToMinMax(startGetBlocksTime, startTime, false);
|
||||
BlocksWithLocations blk =
|
||||
(BlocksWithLocations)invocation.callRealMethod();
|
||||
endGetBlocksTime = Time.monotonicNow();
|
||||
numGetBlocksCalls++;
|
||||
long endTime = Time.monotonicNow();
|
||||
setAtomicLongToMinMax(endGetBlocksTime, endTime, true);
|
||||
numGetBlocksCalls.incrementAndGet();
|
||||
return blk;
|
||||
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
|
||||
}
|
||||
|
||||
private static void setAtomicLongToMinMax(AtomicLong value, long newVal,
|
||||
boolean useMax) {
|
||||
while (true) {
|
||||
long currVal = value.get();
|
||||
if ((useMax && newVal > currVal) || (!useMax && newVal < currVal)) {
|
||||
if (value.compareAndSet(currVal, newVal)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that makes the Balancer to disperse RPCs to the NameNode
|
||||
* in order to avoid NN's RPC queue saturation.
|
||||
* in order to avoid NN's RPC queue saturation. This not marked as @Test
|
||||
* because it is run from {@link TestBalancerRPCDelay}.
|
||||
*/
|
||||
void testBalancerRPCDelay() throws Exception {
|
||||
void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
|
||||
getBlocksMaxQps);
|
||||
|
||||
int numDNs = 20;
|
||||
long[] capacities = new long[numDNs];
|
||||
|
@ -1974,16 +2004,22 @@ public class TestBalancer {
|
|||
racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
|
||||
}
|
||||
doTest(conf, capacities, racks, CAPACITY, RACK2,
|
||||
new PortNumberBasedNodes(3, 0, 0), false, false, true);
|
||||
// Use only 1 node and set the starting capacity to 50% to allow the
|
||||
// balancing to complete in only one iteration. This is necessary
|
||||
// because the startGetBlocksTime and endGetBlocksTime measures across
|
||||
// all get block calls, so if two iterations are performed, the duration
|
||||
// also includes the time it took to perform the block move ops in the
|
||||
// first iteration
|
||||
new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
|
||||
assertTrue("Number of getBlocks should be not less than " +
|
||||
Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
||||
numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
||||
long d = 1 + endGetBlocksTime - startGetBlocksTime;
|
||||
LOG.info("Balancer executed " + numGetBlocksCalls
|
||||
+ " getBlocks in " + d + " msec.");
|
||||
assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
|
||||
Dispatcher.BALANCER_NUM_RPC_PER_SEC,
|
||||
(numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
|
||||
getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
|
||||
long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
|
||||
int durationSec = (int) Math.ceil(durationMs / 1000.0);
|
||||
LOG.info("Balancer executed " + numGetBlocksCalls.get() + " getBlocks in "
|
||||
+ durationMs + " msec (round up to " + durationSec + " sec)");
|
||||
long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec;
|
||||
assertTrue("Expected balancer getBlocks calls per second <= " +
|
||||
getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -25,8 +28,29 @@ import org.junit.Test;
|
|||
*/
|
||||
public class TestBalancerRPCDelay {
|
||||
|
||||
private TestBalancer testBalancer;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
testBalancer = new TestBalancer();
|
||||
testBalancer.setup();
|
||||
}
|
||||
|
||||
@After
|
||||
public void teardown() throws Exception {
|
||||
if (testBalancer != null) {
|
||||
testBalancer.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerRPCDelay() throws Exception {
|
||||
new TestBalancer().testBalancerRPCDelay();
|
||||
public void testBalancerRPCDelayQps3() throws Exception {
|
||||
testBalancer.testBalancerRPCDelay(3);
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerRPCDelayQpsDefault() throws Exception {
|
||||
testBalancer.testBalancerRPCDelay(
|
||||
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.hadoop.util.FakeTimer;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
|
||||
/** Tests for {@link RateLimiter}. */
|
||||
public class TestRateLimiter {
|
||||
|
||||
// epsilon of 1 ns
|
||||
private static final double EPSILON = 1.0 / TimeUnit.SECONDS.toNanos(1);
|
||||
|
||||
@Test
|
||||
public void testRateLimiter() {
|
||||
FakeTimer timer = new FakeTimer();
|
||||
Queue<Long> sleepTimesNanos = new LinkedList<>();
|
||||
Queue<Long> advanceTimerNanos = new LinkedList<>();
|
||||
RateLimiter limiter =
|
||||
new TestingRateLimiter(timer, 10, sleepTimesNanos, advanceTimerNanos);
|
||||
|
||||
final long nanos100ms = TimeUnit.MILLISECONDS.toNanos(100);
|
||||
|
||||
// should be able to acquire immediately the first time
|
||||
assertEquals(0.0, limiter.acquire(), EPSILON);
|
||||
assertTrue(sleepTimesNanos.isEmpty());
|
||||
|
||||
// 100ms of sleep is required the second time
|
||||
advanceTimerNanos.add(nanos100ms);
|
||||
assertEquals(0.1, limiter.acquire(), EPSILON);
|
||||
assertEquals(1, sleepTimesNanos.size());
|
||||
assertNextValue(sleepTimesNanos, nanos100ms);
|
||||
|
||||
// test when it takes 2 sleep cycles to be able to acquire
|
||||
advanceTimerNanos.add(nanos100ms / 2);
|
||||
advanceTimerNanos.add(nanos100ms / 2);
|
||||
assertEquals(0.1, limiter.acquire(), EPSILON);
|
||||
assertEquals(2, sleepTimesNanos.size());
|
||||
assertNextValue(sleepTimesNanos, nanos100ms);
|
||||
assertNextValue(sleepTimesNanos, nanos100ms / 2);
|
||||
|
||||
// if some time passes between acquisitions, the next should be immediate
|
||||
timer.advanceNanos(nanos100ms * 2);
|
||||
assertEquals(0.0, limiter.acquire(), EPSILON);
|
||||
assertTrue(sleepTimesNanos.isEmpty());
|
||||
|
||||
// the rate limiter has no memory, so although time passed, the next
|
||||
// acquisition is still rate limited
|
||||
advanceTimerNanos.add(nanos100ms);
|
||||
assertEquals(0.1, limiter.acquire(), EPSILON);
|
||||
assertEquals(1, sleepTimesNanos.size());
|
||||
assertNextValue(sleepTimesNanos, nanos100ms);
|
||||
}
|
||||
|
||||
private static void assertNextValue(Queue<Long> queue, long expected) {
|
||||
Long value = queue.poll();
|
||||
assertNotNull(value);
|
||||
assertEquals(expected, value.longValue());
|
||||
}
|
||||
|
||||
private static class TestingRateLimiter extends RateLimiter {
|
||||
|
||||
private final FakeTimer fakeTimer;
|
||||
private final Queue<Long> sleepTimesNanos;
|
||||
private final Queue<Long> advanceTimerNanos;
|
||||
|
||||
TestingRateLimiter(FakeTimer fakeTimer, double maxOpsPerSecond,
|
||||
Queue<Long> sleepTimesNanos, Queue<Long> advanceTimerNanos) {
|
||||
super(fakeTimer, maxOpsPerSecond);
|
||||
this.fakeTimer = fakeTimer;
|
||||
this.sleepTimesNanos = sleepTimesNanos;
|
||||
this.advanceTimerNanos = advanceTimerNanos;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean sleep(long sleepTimeNanos) {
|
||||
sleepTimesNanos.offer(sleepTimeNanos);
|
||||
Long advanceNanos = advanceTimerNanos.poll();
|
||||
if (advanceNanos == null) {
|
||||
fail("Unexpected sleep; no timer advance value found");
|
||||
}
|
||||
fakeTimer.advanceNanos(advanceNanos);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue