diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 69d92303955..d179e5c3c50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -222,6 +222,9 @@ Release 2.5.0 - UNRELEASED HDFS-6593. Move SnapshotDiffInfo out of INodeDirectorySnapshottable. (Jing Zhao via wheat9) + HDFS-6595. Allow the maximum threads for balancing on datanodes to be + configurable. (Benoy Antony via szetszwo) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4ceec4a871e..f386b20868f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -105,6 +105,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; + public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves"; + public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5; public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 5568a05d53a..1a590a912c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -189,7 +189,6 @@ public class Balancer { /** The maximum number of concurrent blocks moves for * balancing purpose at a datanode */ - public static final int MAX_NUM_CONCURRENT_MOVES = 5; private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5; public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes @@ -231,6 +230,7 @@ public class Balancer { private final ExecutorService moverExecutor; private final ExecutorService dispatcherExecutor; + private final int maxConcurrentMovesPerNode; /* This class keeps track of a scheduled block move */ private class PendingBlockMove { @@ -516,8 +516,8 @@ public class Balancer { private long scheduledSize = 0L; protected long delayUntil = 0L; // blocks being moved but not confirmed yet - private final List pendingBlocks = - new ArrayList(MAX_NUM_CONCURRENT_MOVES); + private final List pendingBlocks; + private final int maxConcurrentMoves; @Override public String toString() { @@ -528,7 +528,8 @@ public class Balancer { /* Constructor * Depending on avgutil & threshold, calculate maximum bytes to move */ - private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) { + private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold, + int maxConcurrentMoves) { datanode = node; utilization = policy.getUtilization(node); final double avgUtil = policy.getAvgUtilization(); @@ -545,6 +546,8 @@ public class Balancer { maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); } this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); + this.maxConcurrentMoves = maxConcurrentMoves; + this.pendingBlocks = new ArrayList(maxConcurrentMoves); } /** Get the datanode */ @@ -606,7 +609,7 @@ public class Balancer { /* Check if the node can schedule more blocks to move */ synchronized private boolean isPendingQNotFull() { - if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) { + if ( pendingBlocks.size() < this.maxConcurrentMoves ) { return true; } return false; @@ -655,8 +658,9 @@ public class Balancer { = new ArrayList(); /* constructor */ - private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) { - super(node, policy, threshold); + private Source(DatanodeInfo node, BalancingPolicy policy, double threshold, + int maxConcurrentMoves) { + super(node, policy, threshold, maxConcurrentMoves); } /** Add a node task */ @@ -869,6 +873,9 @@ public class Balancer { this.dispatcherExecutor = Executors.newFixedThreadPool( conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); + this.maxConcurrentMovesPerNode = + conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); } /* Given a data node set, build a network topology and decide @@ -908,7 +915,7 @@ public class Balancer { BalancerDatanode datanodeS; final double avg = policy.getAvgUtilization(); if (policy.getUtilization(datanode) > avg) { - datanodeS = new Source(datanode, policy, threshold); + datanodeS = new Source(datanode, policy, threshold, maxConcurrentMovesPerNode); if (isAboveAvgUtilized(datanodeS)) { this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); } else { @@ -919,7 +926,8 @@ public class Balancer { -threshold)*datanodeS.datanode.getCapacity()/100.0); } } else { - datanodeS = new BalancerDatanode(datanode, policy, threshold); + datanodeS = new BalancerDatanode(datanode, policy, threshold, + maxConcurrentMovesPerNode); if ( isBelowOrEqualAvgUtilized(datanodeS)) { this.belowAvgUtilizedDatanodes.add(datanodeS); } else { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java index 6f7310f157a..27dc1bf7457 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java @@ -63,14 +63,17 @@ class DataXceiverServer implements Runnable { */ static class BlockBalanceThrottler extends DataTransferThrottler { private int numThreads; + private int maxThreads; /**Constructor * * @param bandwidth Total amount of bandwidth can be used for balancing */ - private BlockBalanceThrottler(long bandwidth) { + private BlockBalanceThrottler(long bandwidth, int maxThreads) { super(bandwidth); + this.maxThreads = maxThreads; LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s"); + LOG.info("Number threads for balancing is "+ maxThreads); } /** Check if the block move can start. @@ -79,7 +82,7 @@ class DataXceiverServer implements Runnable { * the counter is incremented; False otherwise. */ synchronized boolean acquire() { - if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) { + if (numThreads >= maxThreads) { return false; } numThreads++; @@ -120,8 +123,10 @@ class DataXceiverServer implements Runnable { //set up parameter for cluster balancing this.balanceThrottler = new BlockBalanceThrottler( - conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT)); + conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), + conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index a463101ea16..600041021c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -370,8 +370,13 @@ public class TestBalancer { // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); - assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); - + if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { + assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r); + return; + } else { + assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); + } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); @@ -462,6 +467,20 @@ public class TestBalancer { new String[] {RACK0, RACK1}); } + @Test(timeout=100000) + public void testBalancerWithZeroThreadsForMove() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 0); + testBalancer1Internal (conf); + } + + @Test(timeout=100000) + public void testBalancerWithNonZeroThreadsForMove() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 8); + testBalancer1Internal (conf); + } + @Test(timeout=100000) public void testBalancer2() throws Exception { testBalancer2Internal(new HdfsConfiguration());