diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8ff46e37aee..6cdd01ff6e0 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -804,6 +804,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; + public static final String DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_KEY = + "dfs.datanode.mover.max.concurrent.moves"; + public static final int DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 10; public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts"; public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10; public static final String DFS_MOVER_KEYTAB_ENABLED_KEY = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 5c66d669120..93fbeb1c412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -1242,23 +1242,8 @@ public class Dispatcher { assert concurrentThreads > 0 : "Number of concurrent threads is 0."; LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads); - // Determine the size of each mover thread pool per target - int threadsPerTarget = maxMoverThreads/targets.size(); - if (threadsPerTarget == 0) { - // Some scheduled moves will get ignored as some targets won't have - // any threads allocated. - moverThreadAllocator.setLotSize(1); - LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" + - maxMoverThreads + " is too small for moving blocks to " + - targets.size() + " targets. Balancing may be slower."); - } else { - if (threadsPerTarget > maxConcurrentMovesPerNode) { - threadsPerTarget = maxConcurrentMovesPerNode; - LOG.info("Limiting threads per target to the specified max."); - } - moverThreadAllocator.setLotSize(threadsPerTarget); - LOG.info("Allocating " + threadsPerTarget + " threads per target."); - } + // set threads per target + setAllocatorLotSize(targets.size()); final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { @@ -1289,6 +1274,25 @@ public class Dispatcher { return getBytesMoved() - bytesLastMoved; } + public void setAllocatorLotSize(int targetNum) { + // Determine the size of each mover thread pool per target + int threadsPerTarget = maxMoverThreads / targetNum; + if (threadsPerTarget == 0) { + // Some scheduled moves will get ignored as some targets won't have + // any threads allocated. + moverThreadAllocator.setLotSize(1); + LOG.warn("maxMoverThreads = {} is too small for moving blocks to {} targets." + + " May be slower.", maxMoverThreads, targetNum); + } else { + if (threadsPerTarget > maxConcurrentMovesPerNode) { + threadsPerTarget = maxConcurrentMovesPerNode; + LOG.info("Limiting threads per target to the specified max."); + } + moverThreadAllocator.setLotSize(threadsPerTarget); + LOG.info("Allocating " + threadsPerTarget + " threads per target."); + } + } + /** * Wait for all reportedBlock move confirmations. * @return true if there is failed move execution diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index c8c43967dd0..bb1ee9c20ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -105,6 +105,10 @@ public class Mover { return targets.get(uuid, storageType); } + private StorageGroupMap getTarget() { + return targets; + } + private static G get(StorageGroupMap map, MLocation ml) { return map.get(ml.datanode.getDatanodeUuid(), ml.storageType); } @@ -134,8 +138,8 @@ public class Mover { DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); final int maxConcurrentMovesPerNode = conf.getInt( - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + DFSConfigKeys.DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_DEFAULT); final int maxNoMoveInterval = conf.getInt( DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY, DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT); @@ -176,6 +180,7 @@ public class Mover { storages.add(source, target); } } + dispatcher.setAllocatorLotSize(storages.getTarget().values().size()); } private void initStoragePolicies() throws IOException {