HDFS-16763. MoverTool: Make valid for the number of mover threads per DN.

This commit is contained in:
wanghongbing 2022-09-07 23:30:02 +08:00
parent 42c8f61fec
commit dd300855d2
3 changed files with 31 additions and 19 deletions

View File

@ -804,6 +804,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads";
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_KEY =
"dfs.datanode.mover.max.concurrent.moves";
public static final int DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 10;
public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts"; public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10; public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
public static final String DFS_MOVER_KEYTAB_ENABLED_KEY = public static final String DFS_MOVER_KEYTAB_ENABLED_KEY =

View File

@ -1242,23 +1242,8 @@ public class Dispatcher {
assert concurrentThreads > 0 : "Number of concurrent threads is 0."; assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads); LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads);
// Determine the size of each mover thread pool per target // set threads per target
int threadsPerTarget = maxMoverThreads/targets.size(); setAllocatorLotSize(targets.size());
if (threadsPerTarget == 0) {
// Some scheduled moves will get ignored as some targets won't have
// any threads allocated.
moverThreadAllocator.setLotSize(1);
LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
maxMoverThreads + " is too small for moving blocks to " +
targets.size() + " targets. Balancing may be slower.");
} else {
if (threadsPerTarget > maxConcurrentMovesPerNode) {
threadsPerTarget = maxConcurrentMovesPerNode;
LOG.info("Limiting threads per target to the specified max.");
}
moverThreadAllocator.setLotSize(threadsPerTarget);
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
}
final Iterator<Source> i = sources.iterator(); final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) { for (int j = 0; j < futures.length; j++) {
@ -1289,6 +1274,25 @@ public class Dispatcher {
return getBytesMoved() - bytesLastMoved; return getBytesMoved() - bytesLastMoved;
} }
public void setAllocatorLotSize(int targetNum) {
// Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads / targetNum;
if (threadsPerTarget == 0) {
// Some scheduled moves will get ignored as some targets won't have
// any threads allocated.
moverThreadAllocator.setLotSize(1);
LOG.warn("maxMoverThreads = {} is too small for moving blocks to {} targets."
+ " May be slower.", maxMoverThreads, targetNum);
} else {
if (threadsPerTarget > maxConcurrentMovesPerNode) {
threadsPerTarget = maxConcurrentMovesPerNode;
LOG.info("Limiting threads per target to the specified max.");
}
moverThreadAllocator.setLotSize(threadsPerTarget);
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
}
}
/** /**
* Wait for all reportedBlock move confirmations. * Wait for all reportedBlock move confirmations.
* @return true if there is failed move execution * @return true if there is failed move execution

View File

@ -105,6 +105,10 @@ public class Mover {
return targets.get(uuid, storageType); return targets.get(uuid, storageType);
} }
private StorageGroupMap<StorageGroup> getTarget() {
return targets;
}
private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) { private static <G extends StorageGroup> G get(StorageGroupMap<G> map, MLocation ml) {
return map.get(ml.datanode.getDatanodeUuid(), ml.storageType); return map.get(ml.datanode.getDatanodeUuid(), ml.storageType);
} }
@ -134,8 +138,8 @@ public class Mover {
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
final int maxConcurrentMovesPerNode = conf.getInt( final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_MOVER_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
final int maxNoMoveInterval = conf.getInt( final int maxNoMoveInterval = conf.getInt(
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY, DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT); DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
@ -176,6 +180,7 @@ public class Mover {
storages.add(source, target); storages.add(source, target);
} }
} }
dispatcher.setAllocatorLotSize(storages.getTarget().values().size());
} }
private void initStoragePolicies() throws IOException { private void initStoragePolicies() throws IOException {