diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index f855e45332e..9270fdef17a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -121,6 +122,7 @@ public class Dispatcher { /** The maximum number of concurrent blocks moves at a datanode */ private final int maxConcurrentMovesPerNode; + private final int maxMoverThreads; private final long getBlocksSize; private final long getBlocksMinBlockSize; @@ -139,11 +141,13 @@ public class Dispatcher { static class Allocator { private final int max; private int count = 0; + private int lotSize = 1; Allocator(int max) { this.max = max; } + /** Allocate specified number of items */ synchronized int allocate(int n) { final int remaining = max - count; if (remaining <= 0) { @@ -155,9 +159,19 @@ public class Dispatcher { } } + /** Aloocate a single lot of items */ + int allocate() { + return allocate(lotSize); + } + synchronized void reset() { count = 0; } + + /** Set the lot size */ + synchronized void setLotSize(int lotSize) { + this.lotSize = lotSize; + } } private static class GlobalBlockMap { @@ -1017,6 +1031,7 @@ public class Dispatcher { this.dispatchExecutor = dispatcherThreads == 0? null : Executors.newFixedThreadPool(dispatcherThreads); this.moverThreadAllocator = new Allocator(moverThreads); + this.maxMoverThreads = moverThreads; this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; this.getBlocksSize = getBlocksSize; @@ -1116,7 +1131,7 @@ public class Dispatcher { final DDatanode targetDn = p.target.getDDatanode(); ExecutorService moveExecutor = targetDn.getMoveExecutor(); if (moveExecutor == null) { - final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode); + final int nThreads = moverThreadAllocator.allocate(); if (nThreads > 0) { moveExecutor = targetDn.initMoveExecutor(nThreads); } @@ -1166,6 +1181,25 @@ public class Dispatcher { LOG.debug("Disperse Interval sec = " + concurrentThreads / BALANCER_NUM_RPC_PER_SEC); } + + // Determine the size of each mover thread pool per target + int threadsPerTarget = maxMoverThreads/targets.size(); + if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + + maxMoverThreads + " is too small for moving blocks to " + + targets.size() + " targets. Balancing may be slower."); + } else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { + threadsPerTarget = maxConcurrentMovesPerNode; + LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); + } + long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) {