HDFS-6595. Allow the maximum threads for balancing on datanodes to be configurable. Contributed by Benoy Antony

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605565 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-06-25 19:27:29 +00:00
parent 1a3a7e0c1a
commit e3612e4428
5 changed files with 52 additions and 15 deletions

View File

@ -476,6 +476,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6593. Move SnapshotDiffInfo out of INodeDirectorySnapshottable.
(Jing Zhao via wheat9)
HDFS-6595. Allow the maximum threads for balancing on datanodes to be
configurable. (Benoy Antony via szetszwo)
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -105,6 +105,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";

View File

@ -189,7 +189,6 @@ public class Balancer {
/** The maximum number of concurrent blocks moves for
* balancing purpose at a datanode
*/
public static final int MAX_NUM_CONCURRENT_MOVES = 5;
private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5;
public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds
public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes
@ -231,6 +230,7 @@ public class Balancer {
private final ExecutorService moverExecutor;
private final ExecutorService dispatcherExecutor;
private final int maxConcurrentMovesPerNode;
/* This class keeps track of a scheduled block move */
private class PendingBlockMove {
@ -516,8 +516,8 @@ public class Balancer {
private long scheduledSize = 0L;
protected long delayUntil = 0L;
// blocks being moved but not confirmed yet
private final List<PendingBlockMove> pendingBlocks =
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
private final List<PendingBlockMove> pendingBlocks;
private final int maxConcurrentMoves;
@Override
public String toString() {
@ -528,7 +528,8 @@ public class Balancer {
/* Constructor
* Depending on avgutil & threshold, calculate maximum bytes to move
*/
private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) {
private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold,
int maxConcurrentMoves) {
datanode = node;
utilization = policy.getUtilization(node);
final double avgUtil = policy.getAvgUtilization();
@ -545,6 +546,8 @@ public class Balancer {
maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove);
}
this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
this.maxConcurrentMoves = maxConcurrentMoves;
this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves);
}
/** Get the datanode */
@ -606,7 +609,7 @@ public class Balancer {
/* Check if the node can schedule more blocks to move */
synchronized private boolean isPendingQNotFull() {
if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
if ( pendingBlocks.size() < this.maxConcurrentMoves ) {
return true;
}
return false;
@ -655,8 +658,9 @@ public class Balancer {
= new ArrayList<BalancerBlock>();
/* constructor */
private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) {
super(node, policy, threshold);
private Source(DatanodeInfo node, BalancingPolicy policy, double threshold,
int maxConcurrentMoves) {
super(node, policy, threshold, maxConcurrentMoves);
}
/** Add a node task */
@ -869,6 +873,9 @@ public class Balancer {
this.dispatcherExecutor = Executors.newFixedThreadPool(
conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT));
this.maxConcurrentMovesPerNode =
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
}
/* Given a data node set, build a network topology and decide
@ -908,7 +915,7 @@ public class Balancer {
BalancerDatanode datanodeS;
final double avg = policy.getAvgUtilization();
if (policy.getUtilization(datanode) >= avg) {
datanodeS = new Source(datanode, policy, threshold);
datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode);
if (isAboveAvgUtilized(datanodeS)) {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
} else {
@ -919,7 +926,8 @@ public class Balancer {
-threshold)*datanodeS.datanode.getCapacity()/100.0);
}
} else {
datanodeS = new BalancerDatanode(datanode, policy, threshold);
datanodeS = new BalancerDatanode(datanode, policy, threshold,
maxConcurrentMovesPerNode);
if ( isBelowOrEqualAvgUtilized(datanodeS)) {
this.belowAvgUtilizedDatanodes.add(datanodeS);
} else {

View File

@ -63,14 +63,17 @@ class DataXceiverServer implements Runnable {
*/
static class BlockBalanceThrottler extends DataTransferThrottler {
private int numThreads;
private int maxThreads;
/**Constructor
*
* @param bandwidth Total amount of bandwidth can be used for balancing
*/
private BlockBalanceThrottler(long bandwidth) {
private BlockBalanceThrottler(long bandwidth, int maxThreads) {
super(bandwidth);
this.maxThreads = maxThreads;
LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
LOG.info("Number threads for balancing is "+ maxThreads);
}
/** Check if the block move can start.
@ -79,7 +82,7 @@ class DataXceiverServer implements Runnable {
* the counter is incremented; False otherwise.
*/
synchronized boolean acquire() {
if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) {
if (numThreads >= maxThreads) {
return false;
}
numThreads++;
@ -120,8 +123,10 @@ class DataXceiverServer implements Runnable {
//set up parameter for cluster balancing
this.balanceThrottler = new BlockBalanceThrottler(
conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));
}
@Override

View File

@ -370,8 +370,13 @@ public class TestBalancer {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
return;
} else {
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
}
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor.");
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
@ -462,6 +467,20 @@ public class TestBalancer {
new String[] {RACK0, RACK1});
}
@Test(timeout=100000)
public void testBalancerWithZeroThreadsForMove() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0);
testBalancer1Internal (conf);
}
@Test(timeout=100000)
public void testBalancerWithNonZeroThreadsForMove() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8);
testBalancer1Internal (conf);
}
@Test(timeout=100000)
public void testBalancer2() throws Exception {
testBalancer2Internal(new HdfsConfiguration());