HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.
This commit is contained in:
parent
7f5caca04c
commit
d07dc7afb4
|
@ -282,6 +282,9 @@ public class Balancer {
|
|||
*/
|
||||
Balancer(NameNodeConnector theblockpool, BalancerParameters p,
|
||||
Configuration conf) {
|
||||
// NameNode configuration parameters for balancing
|
||||
getInt(conf, DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
|
||||
final long movedWinWidth = getLong(conf,
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||
|
@ -291,10 +294,6 @@ public class Balancer {
|
|||
final int dispatcherThreads = getInt(conf,
|
||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
||||
final int maxConcurrentMovesPerNode = getInt(conf,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||
|
||||
final long getBlocksSize = getLongBytes(conf,
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
|
||||
|
@ -311,6 +310,13 @@ public class Balancer {
|
|||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
||||
|
||||
// DataNode configuration parameters for balancing
|
||||
final int maxConcurrentMovesPerNode = getInt(conf,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||
getLongBytes(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT);
|
||||
|
||||
this.nnc = theblockpool;
|
||||
this.dispatcher =
|
||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||
|
@ -603,12 +609,13 @@ public class Balancer {
|
|||
this.bytesAlreadyMoved = bytesAlreadyMoved;
|
||||
}
|
||||
|
||||
void print(int iteration, PrintStream out) {
|
||||
out.printf("%-24s %10d %19s %18s %17s%n",
|
||||
void print(int iteration, NameNodeConnector nnc, PrintStream out) {
|
||||
out.printf("%-24s %10d %19s %18s %17s %s%n",
|
||||
DateFormat.getDateTimeInstance().format(new Date()), iteration,
|
||||
StringUtils.byteDesc(bytesAlreadyMoved),
|
||||
StringUtils.byteDesc(bytesLeftToMove),
|
||||
StringUtils.byteDesc(bytesBeingMoved));
|
||||
StringUtils.byteDesc(bytesBeingMoved),
|
||||
nnc.getNameNodeUri());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -653,8 +660,10 @@ public class Balancer {
|
|||
System.out.println("No block can be moved. Exiting...");
|
||||
return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
|
||||
} else {
|
||||
LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
|
||||
" in this iteration");
|
||||
LOG.info("Will move {} in this iteration for {}",
|
||||
StringUtils.byteDesc(bytesBeingMoved), nnc.toString());
|
||||
LOG.info("Total target DataNodes in this iteration: {}",
|
||||
dispatcher.moveTasksTotal());
|
||||
}
|
||||
|
||||
/* For each pair of <source, target>, start a thread that repeatedly
|
||||
|
@ -705,7 +714,9 @@ public class Balancer {
|
|||
LOG.info("excluded nodes = " + p.getExcludedNodes());
|
||||
LOG.info("source nodes = " + p.getSourceNodes());
|
||||
checkKeytabAndInit(conf);
|
||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
||||
System.out.println("Time Stamp Iteration#"
|
||||
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
|
||||
+ " NameNode");
|
||||
|
||||
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||
try {
|
||||
|
@ -721,7 +732,7 @@ public class Balancer {
|
|||
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
|
||||
final Balancer b = new Balancer(nnc, p, conf);
|
||||
final Result r = b.runOneIteration();
|
||||
r.print(iteration, System.out);
|
||||
r.print(iteration, nnc, System.out);
|
||||
|
||||
// clean all lists
|
||||
b.resetData(conf);
|
||||
|
|
|
@ -392,7 +392,7 @@ public class Dispatcher {
|
|||
|
||||
sendRequest(out, eb, accessToken);
|
||||
receiveResponse(in);
|
||||
nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes());
|
||||
nnc.addBytesMoved(reportedBlock.getNumBytes());
|
||||
target.getDDatanode().setHasSuccess();
|
||||
LOG.info("Successfully moved " + this);
|
||||
} catch (IOException e) {
|
||||
|
@ -1064,6 +1064,10 @@ public class Dispatcher {
|
|||
return nnc.getBytesMoved().get();
|
||||
}
|
||||
|
||||
long getBblocksMoved() {
|
||||
return nnc.getBlocksMoved().get();
|
||||
}
|
||||
|
||||
long bytesToMove() {
|
||||
Preconditions.checkState(
|
||||
storageGroupMap.size() >= sources.size() + targets.size(),
|
||||
|
@ -1083,6 +1087,14 @@ public class Dispatcher {
|
|||
targets.add(target);
|
||||
}
|
||||
|
||||
public int moveTasksTotal() {
|
||||
int b = 0;
|
||||
for (Source src : sources) {
|
||||
b += src.tasks.size();
|
||||
}
|
||||
return b;
|
||||
}
|
||||
|
||||
private boolean shouldIgnore(DatanodeInfo dn) {
|
||||
// ignore out-of-service nodes
|
||||
final boolean outOfService = !dn.isInService();
|
||||
|
@ -1164,12 +1176,13 @@ public class Dispatcher {
|
|||
*/
|
||||
private long dispatchBlockMoves() throws InterruptedException {
|
||||
final long bytesLastMoved = getBytesMoved();
|
||||
final long blocksLastMoved = getBblocksMoved();
|
||||
final Future<?>[] futures = new Future<?>[sources.size()];
|
||||
|
||||
int concurrentThreads = Math.min(sources.size(),
|
||||
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
|
||||
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
|
||||
LOG.debug("Balancer concurrent dispatcher threads = {}", concurrentThreads);
|
||||
LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads);
|
||||
|
||||
// Determine the size of each mover thread pool per target
|
||||
int threadsPerTarget = maxMoverThreads/targets.size();
|
||||
|
@ -1211,6 +1224,9 @@ public class Dispatcher {
|
|||
|
||||
// wait for all reportedBlock moving to be done
|
||||
waitForMoveCompletion(targets);
|
||||
LOG.info("Total bytes (blocks) moved in this iteration {} ({})",
|
||||
StringUtils.byteDesc(getBytesMoved() - bytesLastMoved),
|
||||
(getBblocksMoved() - blocksLastMoved));
|
||||
|
||||
return getBytesMoved() - bytesLastMoved;
|
||||
}
|
||||
|
|
|
@ -162,6 +162,7 @@ public class NameNodeConnector implements Closeable {
|
|||
private OutputStream out;
|
||||
private final List<Path> targetPaths;
|
||||
private final AtomicLong bytesMoved = new AtomicLong();
|
||||
private final AtomicLong blocksMoved = new AtomicLong();
|
||||
|
||||
private final int maxNotChangedIterations;
|
||||
private int notChangedIterations = 0;
|
||||
|
@ -233,6 +234,19 @@ public class NameNodeConnector implements Closeable {
|
|||
return bytesMoved;
|
||||
}
|
||||
|
||||
AtomicLong getBlocksMoved() {
|
||||
return blocksMoved;
|
||||
}
|
||||
|
||||
public void addBytesMoved(long numBytes) {
|
||||
bytesMoved.addAndGet(numBytes);
|
||||
blocksMoved.incrementAndGet();
|
||||
}
|
||||
|
||||
public URI getNameNodeUri() {
|
||||
return nameNodeUri;
|
||||
}
|
||||
|
||||
/** @return blocks with locations. */
|
||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long
|
||||
minBlockSize) throws IOException {
|
||||
|
|
|
@ -1018,7 +1018,7 @@ public class TestBalancer {
|
|||
for(NameNodeConnector nnc : connectors) {
|
||||
final Balancer b = new Balancer(nnc, p, conf);
|
||||
final Result r = b.runOneIteration();
|
||||
r.print(iteration, System.out);
|
||||
r.print(iteration, nnc, System.out);
|
||||
|
||||
// clean all lists
|
||||
b.resetData(conf);
|
||||
|
|
Loading…
Reference in New Issue