HDFS-11384. Balancer disperses getBlocks calls to avoid NameNode's rpc queue saturation. Contributed by Konstantin V Shvachko.

(cherry picked from commit 28eb2aabeb)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
This commit is contained in:
Konstantin V Shvachko 2017-04-26 17:28:49 -07:00
parent e84588eb03
commit 4cbf5c5c41
5 changed files with 195 additions and 12 deletions

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -806,8 +807,11 @@ public class Dispatcher {
* namenode for more blocks. It terminates when it has dispatch enough block * namenode for more blocks. It terminates when it has dispatch enough block
* move tasks or it has received enough blocks from the namenode, or the * move tasks or it has received enough blocks from the namenode, or the
* elapsed time of the iteration has exceeded the max time limit. * elapsed time of the iteration has exceeded the max time limit.
*
* @param delay - time to sleep before sending getBlocks. Intended to
* disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
*/ */
private void dispatchBlocks() { private void dispatchBlocks(long delay) {
this.blocksToReceive = 2 * getScheduledSize(); this.blocksToReceive = 2 * getScheduledSize();
long previousMoveTimestamp = Time.monotonicNow(); long previousMoveTimestamp = Time.monotonicNow();
while (getScheduledSize() > 0 && !isIterationOver() while (getScheduledSize() > 0 && !isIterationOver()
@ -832,15 +836,25 @@ public class Dispatcher {
if (shouldFetchMoreBlocks()) { if (shouldFetchMoreBlocks()) {
// fetch new blocks // fetch new blocks
try { try {
if(delay > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sleeping " + delay + " msec.");
}
Thread.sleep(delay);
}
final long received = getBlockList(); final long received = getBlockList();
if (received == 0) { if (received == 0) {
return; return;
} }
blocksToReceive -= received; blocksToReceive -= received;
continue; continue;
} catch (InterruptedException ignored) {
// nothing to do
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Exception while getting block list", e); LOG.warn("Exception while getting block list", e);
return; return;
} finally {
delay = 0L;
} }
} else { } else {
// jump out of while-loop after the configured timeout. // jump out of while-loop after the configured timeout.
@ -1031,6 +1045,12 @@ public class Dispatcher {
return nnc.shouldContinue(dispatchBlockMoves()); return nnc.shouldContinue(dispatchBlockMoves());
} }
/**
* The best-effort limit on the number of RPCs per second
* the Balancer will send to the NameNode.
*/
final static int BALANCER_NUM_RPC_PER_SEC = 20;
/** /**
* Dispatch block moves for each source. The thread selects blocks to move & * Dispatch block moves for each source. The thread selects blocks to move &
* sends request to proxy source to initiate block move. The process is flow * sends request to proxy source to initiate block move. The process is flow
@ -1043,15 +1063,32 @@ public class Dispatcher {
final long bytesLastMoved = getBytesMoved(); final long bytesLastMoved = getBytesMoved();
final Future<?>[] futures = new Future<?>[sources.size()]; final Future<?>[] futures = new Future<?>[sources.size()];
int concurrentThreads = Math.min(sources.size(),
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
if (LOG.isDebugEnabled()) {
LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
LOG.debug("Balancer concurrent threads = " + concurrentThreads);
LOG.debug("Disperse Interval sec = " +
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
}
long dSec = 0;
final Iterator<Source> i = sources.iterator(); final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) { for (int j = 0; j < futures.length; j++) {
final Source s = i.next(); final Source s = i.next();
final long delay = dSec * 1000;
futures[j] = dispatchExecutor.submit(new Runnable() { futures[j] = dispatchExecutor.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
s.dispatchBlocks(); s.dispatchBlocks(delay);
} }
}); });
// Calculate delay in seconds for the next iteration
if(j >= concurrentThreads) {
dSec = 0;
} else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
dSec++;
}
} }
// wait for all dispatcher threads to finish // wait for all dispatcher threads to finish

View File

@ -48,6 +48,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue; import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -101,9 +104,14 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithUpgradeDomain;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.minikdc.MiniKdc;
@ -117,6 +125,8 @@ import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/** /**
* This class tests if a balancer schedules tasks correctly. * This class tests if a balancer schedules tasks correctly.
@ -126,6 +136,7 @@ public class TestBalancer {
static { static {
GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL); GenericTestUtils.setLogLevel(Balancer.LOG, Level.ALL);
GenericTestUtils.setLogLevel(Dispatcher.LOG, Level.DEBUG);
} }
final static long CAPACITY = 5000L; final static long CAPACITY = 5000L;
@ -750,6 +761,13 @@ public class TestBalancer {
doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
} }
private void doTest(Configuration conf, long[] capacities, String[] racks,
long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception {
doTest(conf, capacities, racks, newCapacity, newRack, nodes,
useTool, useFile, false);
}
/** This test start a cluster with specified number of nodes, /** This test start a cluster with specified number of nodes,
* and fills it to be 30% full (with a single file replicated identically * and fills it to be 30% full (with a single file replicated identically
* to all datanodes); * to all datanodes);
@ -765,11 +783,13 @@ public class TestBalancer {
* parsing, etc. Otherwise invoke balancer API directly. * parsing, etc. Otherwise invoke balancer API directly.
* @param useFile - if true, the hosts to included or excluded will be stored in a * @param useFile - if true, the hosts to included or excluded will be stored in a
* file and then later read from the file. * file and then later read from the file.
* @param useNamesystemSpy - spy on FSNamesystem if true
* @throws Exception * @throws Exception
*/ */
private void doTest(Configuration conf, long[] capacities, private void doTest(Configuration conf, long[] capacities,
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
boolean useTool, boolean useFile) throws Exception { boolean useTool, boolean useFile,
boolean useNamesystemSpy) throws Exception {
LOG.info("capacities = " + long2String(capacities)); LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks)); LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity); LOG.info("newCapacity= " + newCapacity);
@ -777,15 +797,25 @@ public class TestBalancer {
LOG.info("useTool = " + useTool); LOG.info("useTool = " + useTool);
assertEquals(capacities.length, racks.length); assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length; int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.racks(racks)
.simulatedCapacities(capacities)
.build();
try { try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.build();
cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
if(useNamesystemSpy) {
LOG.info("Using Spy Namesystem");
spyFSNamesystem(cluster.getNameNode());
}
cluster.startDataNodes(conf, numOfDatanodes, true,
StartupOption.REGULAR, racks, null, capacities, false);
cluster.waitClusterUp();
cluster.waitActive(); cluster.waitActive();
client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(), client = NameNodeProxies.createProxy(conf,
ClientProtocol.class).getProxy(); cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
long totalCapacity = sum(capacities); long totalCapacity = sum(capacities);
@ -865,9 +895,11 @@ public class TestBalancer {
runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
} }
} finally { } finally {
if(cluster != null) {
cluster.shutdown(); cluster.shutdown();
} }
} }
}
private void runBalancer(Configuration conf, long totalUsedSpace, private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity) throws Exception { long totalCapacity) throws Exception {
@ -1906,6 +1938,54 @@ public class TestBalancer {
} }
} }
private static int numGetBlocksCalls;
private static long startGetBlocksTime, endGetBlocksTime;
private void spyFSNamesystem(NameNode nn) throws IOException {
FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn);
numGetBlocksCalls = 0;
endGetBlocksTime = startGetBlocksTime = Time.monotonicNow();
doAnswer(new Answer<BlocksWithLocations>() {
@Override
public BlocksWithLocations answer(InvocationOnMock invocation)
throws Throwable {
BlocksWithLocations blk =
(BlocksWithLocations)invocation.callRealMethod();
endGetBlocksTime = Time.monotonicNow();
numGetBlocksCalls++;
return blk;
}}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong());
}
/**
* Test that makes the Balancer to disperse RPCs to the NameNode
* in order to avoid NN's RPC queue saturation.
*/
void testBalancerRPCDelay() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
int numDNs = 40;
long[] capacities = new long[numDNs];
String[] racks = new String[numDNs];
for(int i = 0; i < numDNs; i++) {
capacities[i] = CAPACITY;
racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
}
doTest(conf, capacities, racks, CAPACITY, RACK2,
new PortNumberBasedNodes(3, 0, 0), false, false, true);
assertTrue("Number of getBlocks should be not less than " +
Dispatcher.BALANCER_NUM_RPC_PER_SEC,
numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC);
long d = 1 + endGetBlocksTime - startGetBlocksTime;
LOG.info("Balancer executed " + numGetBlocksCalls
+ " getBlocks in " + d + " msec.");
assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " +
Dispatcher.BALANCER_NUM_RPC_PER_SEC,
(numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC);
}
/** /**
* @param args * @param args
*/ */

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.junit.Test;
/**
* The Balancer ensures that it disperses RPCs to the NameNode
* in order to avoid NN's RPC queue saturation.
*/
public class TestBalancerRPCDelay {
@Test(timeout=100000)
public void testBalancerRPCDelay() throws Exception {
new TestBalancer().testBalancerRPCDelay();
}
}

View File

@ -143,6 +143,11 @@ public class BlockManagerTestUtil {
} }
} }
public static HeartbeatManager getHeartbeatManager(
final BlockManager blockManager) {
return blockManager.getDatanodeManager().getHeartbeatManager();
}
/** /**
* @return corruptReplicas from block manager * @return corruptReplicas from block manager
*/ */

View File

@ -24,6 +24,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
@ -185,6 +186,34 @@ public class NameNodeAdapter {
return fsn.getStats(); return fsn.getStats();
} }
public static FSNamesystem spyOnNamesystem(NameNode nn) {
FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
FSNamesystem fsnOld = nn.namesystem;
fsnOld.writeLock();
fsnSpy.writeLock();
nn.namesystem = fsnSpy;
try {
FieldUtils.writeDeclaredField(
(NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
fsnSpy.getBlockManager().getDatanodeManager(),
"namesystem", fsnSpy, true);
FieldUtils.writeDeclaredField(
BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
"namesystem", fsnSpy, true);
} catch (IllegalAccessException e) {
throw new RuntimeException("Cannot set spy FSNamesystem", e);
} finally {
fsnSpy.writeUnlock();
fsnOld.writeUnlock();
}
return fsnSpy;
}
public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) { public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests()); ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
fsn.setFsLockForTests(spy); fsn.setFsLockForTests(spy);