HDFS-14973. More strictly enforce Balancer/Mover/SPS throttling of getBlocks RPCs to NameNodes. Contributed by Erik Krogen.

(cherry picked from b2cc8b6b4a)
(cherry picked from 60655bfe54)
This commit is contained in:
Erik Krogen 2019-11-08 08:57:14 -08:00
parent b9e4fd5f4c
commit 17779adb32
6 changed files with 108 additions and 62 deletions

View File

@ -591,6 +591,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.datanode.metrics.logger.period.seconds"; "dfs.datanode.metrics.logger.period.seconds";
public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT =
600; 600;
/**
* The maximum number of getBlocks RPCs data movement utilities can make to
* a NameNode per second. Values <= 0 disable throttling. This affects
* anything that uses a NameNodeConnector, i.e., the Balancer, Mover,
* and StoragePolicySatisfier.
*/
public static final String DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps";
public static final int DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20;
public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;

View File

@ -927,10 +927,8 @@ public class Dispatcher {
* move tasks or it has received enough blocks from the namenode, or the * move tasks or it has received enough blocks from the namenode, or the
* elapsed time of the iteration has exceeded the max time limit. * elapsed time of the iteration has exceeded the max time limit.
* *
* @param delay - time to sleep before sending getBlocks. Intended to
* disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
*/ */
private void dispatchBlocks(long delay) { private void dispatchBlocks() {
this.blocksToReceive = 2 * getScheduledSize(); this.blocksToReceive = 2 * getScheduledSize();
long previousMoveTimestamp = Time.monotonicNow(); long previousMoveTimestamp = Time.monotonicNow();
while (getScheduledSize() > 0 && !isIterationOver() while (getScheduledSize() > 0 && !isIterationOver()
@ -955,25 +953,15 @@ public class Dispatcher {
if (shouldFetchMoreBlocks()) { if (shouldFetchMoreBlocks()) {
// fetch new blocks // fetch new blocks
try { try {
if(delay > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping " + delay + " msec.");
}
Thread.sleep(delay);
}
final long received = getBlockList(); final long received = getBlockList();
if (received == 0) { if (received == 0) {
return; return;
} }
blocksToReceive -= received; blocksToReceive -= received;
continue; continue;
} catch (InterruptedException ignored) {
// nothing to do
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception while getting reportedBlock list", e); LOG.warn("Exception while getting reportedBlock list", e);
return; return;
} finally {
delay = 0L;
} }
} else { } else {
// jump out of while-loop after the configured timeout. // jump out of while-loop after the configured timeout.
@ -1164,12 +1152,6 @@ public class Dispatcher {
return nnc.shouldContinue(dispatchBlockMoves()); return nnc.shouldContinue(dispatchBlockMoves());
} }
/**
* The best-effort limit on the number of RPCs per second
* the Balancer will send to the NameNode.
*/
final static int BALANCER_NUM_RPC_PER_SEC = 20;
/** /**
* Dispatch block moves for each source. The thread selects blocks to move & * Dispatch block moves for each source. The thread selects blocks to move &
* sends request to proxy source to initiate block move. The process is flow * sends request to proxy source to initiate block move. The process is flow
@ -1185,12 +1167,7 @@ public class Dispatcher {
int concurrentThreads = Math.min(sources.size(), int concurrentThreads = Math.min(sources.size(),
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
assert concurrentThreads > 0 : "Number of concurrent threads is 0."; assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
if (LOG.isDebugEnabled()) { LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
LOG.debug("Balancer concurrent threads = " + concurrentThreads);
LOG.debug("Disperse Interval sec = " +
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
}
// Determine the size of each mover thread pool per target // Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads/targets.size(); int threadsPerTarget = maxMoverThreads/targets.size();
@ -1210,23 +1187,15 @@ public class Dispatcher {
LOG.info("Allocating " + threadsPerTarget + " threads per target."); LOG.info("Allocating " + threadsPerTarget + " threads per target.");
} }
long dSec = 0;
final Iterator<Source> i = sources.iterator(); final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) { for (int j = 0; j < futures.length; j++) {
final Source s = i.next(); final Source s = i.next();
final long delay = dSec * 1000;
futures[j] = dispatchExecutor.submit(new Runnable() { futures[j] = dispatchExecutor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
s.dispatchBlocks(delay); s.dispatchBlocks();
} }
}); });
// Calculate delay in seconds for the next iteration
if(j >= concurrentThreads) {
dSec = 0;
} else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
dSec++;
}
} }
// wait for all dispatcher threads to finish // wait for all dispatcher threads to finish

View File

@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -40,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability; import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@ -50,7 +52,6 @@ import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols; import org.apache.hadoop.hdfs.server.protocol.BalancerProtocols;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -116,6 +117,7 @@ public class NameNodeConnector implements Closeable {
private final int maxNotChangedIterations; private final int maxNotChangedIterations;
private int notChangedIterations = 0; private int notChangedIterations = 0;
private final RateLimiter getBlocksRateLimiter;
public NameNodeConnector(String name, URI nameNodeUri, Path idPath, public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
List<Path> targetPaths, Configuration conf, List<Path> targetPaths, Configuration conf,
@ -126,6 +128,16 @@ public class NameNodeConnector implements Closeable {
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
.asList(new Path("/")) : targetPaths; .asList(new Path("/")) : targetPaths;
this.maxNotChangedIterations = maxNotChangedIterations; this.maxNotChangedIterations = maxNotChangedIterations;
int getBlocksMaxQps = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
if (getBlocksMaxQps > 0) {
LOG.info("getBlocks calls for " + nameNodeUri
+ " will be rate-limited to " + getBlocksMaxQps + " per second");
this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps);
} else {
this.getBlocksRateLimiter = null;
}
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); BalancerProtocols.class, fallbackToSimpleAuth).getProxy();
@ -160,8 +172,10 @@ public class NameNodeConnector implements Closeable {
/** @return blocks with locations. */ /** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
minBlockSize) minBlockSize) throws IOException {
throws IOException { if (getBlocksRateLimiter != null) {
getBlocksRateLimiter.acquire();
}
return namenode.getBlocks(datanode, size, minBlockSize); return namenode.getBlocks(datanode, size, minBlockSize);
} }

View File

@ -3518,7 +3518,16 @@
HTTPS port for DataNode. HTTPS port for DataNode.
</description> </description>
</property> </property>
<property>
<name>dfs.namenode.get-blocks.max-qps</name>
<value>20</value>
<description>
The maximum number of getBlocks RPCs data movement utilities can make to
a NameNode per second. Values less than or equal to 0 disable throttling.
This affects anything that uses a NameNodeConnector, i.e., the Balancer,
Mover, and StoragePolicySatisfier.
</description>
</property>
<property> <property>
<name>dfs.balancer.dispatcherThreads</name> <name>dfs.balancer.dispatcherThreads</name>
<value>200</value> <value>200</value>

View File

@ -72,6 +72,8 @@ import java.util.Properties;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -128,6 +130,7 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -157,6 +160,16 @@ public class TestBalancer {
private static MiniKdc kdc; private static MiniKdc kdc;
private static File keytabFile; private static File keytabFile;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private AtomicInteger numGetBlocksCalls;
private AtomicLong startGetBlocksTime;
private AtomicLong endGetBlocksTime;
@Before
public void setup() {
numGetBlocksCalls = new AtomicInteger(0);
startGetBlocksTime = new AtomicLong(Long.MAX_VALUE);
endGetBlocksTime = new AtomicLong(Long.MIN_VALUE);
}
@After @After
public void shutdown() throws Exception { public void shutdown() throws Exception {
@ -791,7 +804,7 @@ public class TestBalancer {
long newCapacity, String newRack, NewNodeInfo nodes, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception { boolean useTool, boolean useFile) throws Exception {
doTest(conf, capacities, racks, newCapacity, newRack, nodes, doTest(conf, capacities, racks, newCapacity, newRack, nodes,
useTool, useFile, false); useTool, useFile, false, 0.3);
} }
/** This test start a cluster with specified number of nodes, /** This test start a cluster with specified number of nodes,
@ -810,12 +823,14 @@ public class TestBalancer {
* @param useFile - if true, the hosts to included or excluded will be stored in a * @param useFile - if true, the hosts to included or excluded will be stored in a
* file and then later read from the file. * file and then later read from the file.
* @param useNamesystemSpy - spy on FSNamesystem if true * @param useNamesystemSpy - spy on FSNamesystem if true
* @param clusterUtilization - The utilization of the cluster to start, from
* 0.0 to 1.0
* @throws Exception * @throws Exception
*/ */
private void doTest(Configuration conf, long[] capacities, private void doTest(Configuration conf, long[] capacities,
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile, boolean useTool, boolean useFile,
boolean useNamesystemSpy) throws Exception { boolean useNamesystemSpy, double clusterUtilization) throws Exception {
LOG.info("capacities = " + long2String(capacities)); LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks)); LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity); LOG.info("newCapacity= " + newCapacity);
@ -845,8 +860,8 @@ public class TestBalancer {
long totalCapacity = sum(capacities); long totalCapacity = sum(capacities);
// fill up the cluster to be 30% full // fill up the cluster to be `clusterUtilization` full
long totalUsedSpace = totalCapacity*3/10; long totalUsedSpace = (long) (totalCapacity * clusterUtilization);
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0); (short) numOfDatanodes, 0);
@ -2135,33 +2150,34 @@ public class TestBalancer {
} }
} }
private static int numGetBlocksCalls;
private static long startGetBlocksTime, endGetBlocksTime;
private void spyFSNamesystem(NameNode nn) throws IOException { private void spyFSNamesystem(NameNode nn) throws IOException {
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn); FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
numGetBlocksCalls = 0;
endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
doAnswer(new Answer<BlocksWithLocations>() { doAnswer(new Answer<BlocksWithLocations>() {
@Override @Override
public BlocksWithLocations answer(InvocationOnMock invocation) public BlocksWithLocations answer(InvocationOnMock invocation)
throws Throwable { throws Throwable {
long startTime = Time.monotonicNow();
startGetBlocksTime.getAndUpdate((curr) -> Math.min(curr, startTime));
BlocksWithLocations blk = BlocksWithLocations blk =
(BlocksWithLocations)invocation.callRealMethod(); (BlocksWithLocations)invocation.callRealMethod();
endGetBlocksTime = Time.monotonicNow(); long endTime = Time.monotonicNow();
numGetBlocksCalls++; endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime));
numGetBlocksCalls.incrementAndGet();
return blk; return blk;
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong()); }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong());
} }
/** /**
* Test that makes the Balancer to disperse RPCs to the NameNode * Test that makes the Balancer to disperse RPCs to the NameNode
* in order to avoid NN's RPC queue saturation. * in order to avoid NN's RPC queue saturation. This not marked as @Test
* because it is run from {@link TestBalancerRPCDelay}.
*/ */
void testBalancerRPCDelay() throws Exception { void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
initConf(conf); initConf(conf);
conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
getBlocksMaxQps);
int numDNs = 20; int numDNs = 20;
long[] capacities = new long[numDNs]; long[] capacities = new long[numDNs];
@ -2171,16 +2187,22 @@ public class TestBalancer {
racks[i] = (i < numDNs/2 ? RACK0 : RACK1); racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
} }
doTest(conf, capacities, racks, CAPACITY, RACK2, doTest(conf, capacities, racks, CAPACITY, RACK2,
new PortNumberBasedNodes(3, 0, 0), false, false, true); // Use only 1 node and set the starting capacity to 50% to allow the
// balancing to complete in only one iteration. This is necessary
// because the startGetBlocksTime and endGetBlocksTime measures across
// all get block calls, so if two iterations are performed, the duration
// also includes the time it took to perform the block move ops in the
// first iteration
new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
assertTrue("Number of getBlocks should be not less than " + assertTrue("Number of getBlocks should be not less than " +
Dispatcher.BALANCER_NUM_RPC_PER_SEC, getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC); long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
long d = 1 + endGetBlocksTime - startGetBlocksTime; int durationSec = (int) Math.ceil(durationMs / 1000.0);
LOG.info("Balancer executed " + numGetBlocksCalls LOG.info("Balancer executed " + numGetBlocksCalls.get() + " getBlocks in "
+ " getBlocks in " + d + " msec."); + durationMs + " msec (round up to " + durationSec + " sec)");
assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " + long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec;
Dispatcher.BALANCER_NUM_RPC_PER_SEC, assertTrue("Expected balancer getBlocks calls per second <= " +
(numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC); getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
} }
/** /**

View File

@ -17,6 +17,9 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
@ -25,8 +28,29 @@ import org.junit.Test;
*/ */
public class TestBalancerRPCDelay { public class TestBalancerRPCDelay {
private TestBalancer testBalancer;
@Before
public void setup() {
testBalancer = new TestBalancer();
testBalancer.setup();
}
@After
public void teardown() throws Exception {
if (testBalancer != null) {
testBalancer.shutdown();
}
}
@Test(timeout=100000) @Test(timeout=100000)
public void testBalancerRPCDelay() throws Exception { public void testBalancerRPCDelayQps3() throws Exception {
new TestBalancer().testBalancerRPCDelay(); testBalancer.testBalancerRPCDelay(3);
}
@Test(timeout=100000)
public void testBalancerRPCDelayQpsDefault() throws Exception {
testBalancer.testBalancerRPCDelay(
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
} }
} }