diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 753a2703ec3..48938bf5546 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -282,6 +282,9 @@ static int getFailedTimesSinceLastSuccessfulBalance() { */ Balancer(NameNodeConnector theblockpool, BalancerParameters p, Configuration conf) { + // NameNode configuration parameters for balancing + getInt(conf, DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY, + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); final long movedWinWidth = getLong(conf, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); @@ -291,10 +294,6 @@ static int getFailedTimesSinceLastSuccessfulBalance() { final int dispatcherThreads = getInt(conf, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); - final int maxConcurrentMovesPerNode = getInt(conf, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); - final long getBlocksSize = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT); @@ -311,6 +310,13 @@ static int getFailedTimesSinceLastSuccessfulBalance() { DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT); + // DataNode configuration parameters for balancing + final int maxConcurrentMovesPerNode = getInt(conf, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + getLongBytes(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT); + this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), @@ -603,12 +609,13 @@ static class Result { this.bytesAlreadyMoved = bytesAlreadyMoved; } - void print(int iteration, PrintStream out) { - out.printf("%-24s %10d %19s %18s %17s%n", + void print(int iteration, NameNodeConnector nnc, PrintStream out) { + out.printf("%-24s %10d %19s %18s %17s %s%n", DateFormat.getDateTimeInstance().format(new Date()), iteration, StringUtils.byteDesc(bytesAlreadyMoved), StringUtils.byteDesc(bytesLeftToMove), - StringUtils.byteDesc(bytesBeingMoved)); + StringUtils.byteDesc(bytesBeingMoved), + nnc.getNameNodeUri()); } } @@ -653,8 +660,10 @@ Result runOneIteration() { System.out.println("No block can be moved. Exiting..."); return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved); } else { - LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) + - " in this iteration"); + LOG.info("Will move {} in this iteration for {}", + StringUtils.byteDesc(bytesBeingMoved), nnc.toString()); + LOG.info("Total target DataNodes in this iteration: {}", + dispatcher.moveTasksTotal()); } /* For each pair of , start a thread that repeatedly @@ -705,7 +714,9 @@ static private int doBalance(Collection namenodes, LOG.info("excluded nodes = " + p.getExcludedNodes()); LOG.info("source nodes = " + p.getSourceNodes()); checkKeytabAndInit(conf); - System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); + System.out.println("Time Stamp Iteration#" + + " Bytes Already Moved Bytes Left To Move Bytes Being Moved" + + " NameNode"); List connectors = Collections.emptyList(); try { @@ -721,7 +732,7 @@ static private int doBalance(Collection namenodes, || p.getBlockPools().contains(nnc.getBlockpoolID())) { final Balancer b = new Balancer(nnc, p, conf); final Result r = b.runOneIteration(); - r.print(iteration, System.out); + r.print(iteration, nnc, System.out); // clean all lists b.resetData(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index c222270882f..885cd63786d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -392,7 +392,7 @@ private void dispatch() { sendRequest(out, eb, accessToken); receiveResponse(in); - nnc.getBytesMoved().addAndGet(reportedBlock.getNumBytes()); + nnc.addBytesMoved(reportedBlock.getNumBytes()); target.getDDatanode().setHasSuccess(); LOG.info("Successfully moved " + this); } catch (IOException e) { @@ -1064,6 +1064,10 @@ long getBytesMoved() { return nnc.getBytesMoved().get(); } + long getBblocksMoved() { + return nnc.getBlocksMoved().get(); + } + long bytesToMove() { Preconditions.checkState( storageGroupMap.size() >= sources.size() + targets.size(), @@ -1083,6 +1087,14 @@ void add(Source source, StorageGroup target) { targets.add(target); } + public int moveTasksTotal() { + int b = 0; + for (Source src : sources) { + b += src.tasks.size(); + } + return b; + } + private boolean shouldIgnore(DatanodeInfo dn) { // ignore out-of-service nodes final boolean outOfService = !dn.isInService(); @@ -1164,12 +1176,13 @@ public boolean dispatchAndCheckContinue() throws InterruptedException { */ private long dispatchBlockMoves() throws InterruptedException { final long bytesLastMoved = getBytesMoved(); + final long blocksLastMoved = getBblocksMoved(); final Future[] futures = new Future[sources.size()]; int concurrentThreads = Math.min(sources.size(), ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); assert concurrentThreads > 0 : "Number of concurrent threads is 0."; - LOG.debug("Balancer concurrent dispatcher threads = {}", concurrentThreads); + LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads); // Determine the size of each mover thread pool per target int threadsPerTarget = maxMoverThreads/targets.size(); @@ -1211,6 +1224,9 @@ public void run() { // wait for all reportedBlock moving to be done waitForMoveCompletion(targets); + LOG.info("Total bytes (blocks) moved in this iteration {} ({})", + StringUtils.byteDesc(getBytesMoved() - bytesLastMoved), + (getBblocksMoved() - blocksLastMoved)); return getBytesMoved() - bytesLastMoved; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 586c68048a9..bc20863d921 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -162,6 +162,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) { private OutputStream out; private final List targetPaths; private final AtomicLong bytesMoved = new AtomicLong(); + private final AtomicLong blocksMoved = new AtomicLong(); private final int maxNotChangedIterations; private int notChangedIterations = 0; @@ -233,6 +234,19 @@ AtomicLong getBytesMoved() { return bytesMoved; } + AtomicLong getBlocksMoved() { + return blocksMoved; + } + + public void addBytesMoved(long numBytes) { + bytesMoved.addAndGet(numBytes); + blocksMoved.incrementAndGet(); + } + + public URI getNameNodeUri() { + return nameNodeUri; + } + /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long minBlockSize) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index d4ca119738e..a0f95f70a4d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1018,7 +1018,7 @@ private static int runBalancer(Collection namenodes, for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); final Result r = b.runOneIteration(); - r.print(iteration, System.out); + r.print(iteration, nnc, System.out); // clean all lists b.resetData(conf);