HDFS-11742. Improve balancer usability after HDFS-8818. Contributed by Kihwal Lee

(cherry picked from commit 8e3a992ecc)
This commit is contained in:
Kihwal Lee 2017-07-21 09:22:28 -05:00
parent f65dc6ee95
commit e229ffee64
1 changed files with 34 additions and 1 deletions

View File

@ -118,6 +118,7 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode;
private final int maxMoverThreads;
private final long getBlocksSize;
private final long getBlocksMinBlockSize;
@ -136,11 +137,13 @@ public class Dispatcher {
static class Allocator {
private final int max;
private int count = 0;
private int lotSize = 1;
Allocator(int max) {
this.max = max;
}
/** Allocate specified number of items */
synchronized int allocate(int n) {
final int remaining = max - count;
if (remaining <= 0) {
@ -152,9 +155,19 @@ public class Dispatcher {
}
}
/** Aloocate a single lot of items */
int allocate() {
return allocate(lotSize);
}
synchronized void reset() {
count = 0;
}
/** Set the lot size */
synchronized void setLotSize(int lotSize) {
this.lotSize = lotSize;
}
}
private static class GlobalBlockMap {
@ -921,6 +934,7 @@ public class Dispatcher {
this.dispatchExecutor = dispatcherThreads == 0? null
: Executors.newFixedThreadPool(dispatcherThreads);
this.moverThreadAllocator = new Allocator(moverThreads);
this.maxMoverThreads = moverThreads;
this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
this.getBlocksSize = getBlocksSize;
@ -1024,7 +1038,7 @@ public class Dispatcher {
final DDatanode targetDn = p.target.getDDatanode();
ExecutorService moveExecutor = targetDn.getMoveExecutor();
if (moveExecutor == null) {
final int nThreads = moverThreadAllocator.allocate(maxConcurrentMovesPerNode);
final int nThreads = moverThreadAllocator.allocate();
if (nThreads > 0) {
moveExecutor = targetDn.initMoveExecutor(nThreads);
}
@ -1075,6 +1089,25 @@ public class Dispatcher {
LOG.debug("Disperse Interval sec = " +
concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
}
// Determine the size of each mover thread pool per target
int threadsPerTarget = maxMoverThreads/targets.size();
if (threadsPerTarget == 0) {
// Some scheduled moves will get ignored as some targets won't have
// any threads allocated.
moverThreadAllocator.setLotSize(1);
LOG.warn(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY + "=" +
maxMoverThreads + " is too small for moving blocks to " +
targets.size() + " targets. Balancing may be slower.");
} else {
if (threadsPerTarget > maxConcurrentMovesPerNode) {
threadsPerTarget = maxConcurrentMovesPerNode;
LOG.info("Limiting threads per target to the specified max.");
}
moverThreadAllocator.setLotSize(threadsPerTarget);
LOG.info("Allocating " + threadsPerTarget + " threads per target.");
}
long dSec = 0;
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {