HDFS-7364. Balancer always shows zero Bytes Already Moved. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2014-11-06 17:48:36 -08:00
parent a3839a9fbf
commit ae71a671a3
5 changed files with 142 additions and 45 deletions

View File

@ -1042,6 +1042,9 @@ Release 2.6.0 - UNRELEASED
file descriptors when SASL is enabled on DataTransferProtocol. file descriptors when SASL is enabled on DataTransferProtocol.
(Chris Nauroth via wheat9) (Chris Nauroth via wheat9)
HDFS-7364. Balancer always shows zero Bytes Already Moved.
(Tsz Wo Nicholas Sze via jing9)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import java.util.Formatter;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -162,7 +161,7 @@ import com.google.common.base.Preconditions;
public class Balancer { public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class); static final Log LOG = LogFactory.getLog(Balancer.class);
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final long GB = 1L << 30; //1GB private static final long GB = 1L << 30; //1GB
private static final long MAX_SIZE_TO_MOVE = 10*GB; private static final long MAX_SIZE_TO_MOVE = 10*GB;
@ -459,7 +458,7 @@ public class Balancer {
} }
/* reset all fields in a balancer preparing for the next iteration */ /* reset all fields in a balancer preparing for the next iteration */
private void resetData(Configuration conf) { void resetData(Configuration conf) {
this.overUtilized.clear(); this.overUtilized.clear();
this.aboveAvgUtilized.clear(); this.aboveAvgUtilized.clear();
this.belowAvgUtilized.clear(); this.belowAvgUtilized.clear();
@ -468,15 +467,46 @@ public class Balancer {
dispatcher.reset(conf);; dispatcher.reset(conf);;
} }
static class Result {
final ExitStatus exitStatus;
final long bytesLeftToMove;
final long bytesBeingMoved;
final long bytesAlreadyMoved;
Result(ExitStatus exitStatus, long bytesLeftToMove, long bytesBeingMoved,
long bytesAlreadyMoved) {
this.exitStatus = exitStatus;
this.bytesLeftToMove = bytesLeftToMove;
this.bytesBeingMoved = bytesBeingMoved;
this.bytesAlreadyMoved = bytesAlreadyMoved;
}
void print(int iteration, PrintStream out) {
out.printf("%-24s %10d %19s %18s %17s%n",
DateFormat.getDateTimeInstance().format(new Date()), iteration,
StringUtils.byteDesc(bytesAlreadyMoved),
StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesBeingMoved));
}
}
Result newResult(ExitStatus exitStatus, long bytesLeftToMove, long bytesBeingMoved) {
return new Result(exitStatus, bytesLeftToMove, bytesBeingMoved,
dispatcher.getBytesMoved());
}
Result newResult(ExitStatus exitStatus) {
return new Result(exitStatus, -1, -1, dispatcher.getBytesMoved());
}
/** Run an iteration for all datanodes. */ /** Run an iteration for all datanodes. */
private ExitStatus run(int iteration, Formatter formatter, Result runOneIteration() {
Configuration conf) {
try { try {
final List<DatanodeStorageReport> reports = dispatcher.init(); final List<DatanodeStorageReport> reports = dispatcher.init();
final long bytesLeftToMove = init(reports); final long bytesLeftToMove = init(reports);
if (bytesLeftToMove == 0) { if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting..."); System.out.println("The cluster is balanced. Exiting...");
return ExitStatus.SUCCESS; return newResult(ExitStatus.SUCCESS, bytesLeftToMove, -1);
} else { } else {
LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
+ " to make the cluster balanced." ); + " to make the cluster balanced." );
@ -487,23 +517,15 @@ public class Balancer {
* in this iteration. Maximum bytes to be moved per node is * in this iteration. Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
*/ */
final long bytesToMove = chooseStorageGroups(); final long bytesBeingMoved = chooseStorageGroups();
if (bytesToMove == 0) { if (bytesBeingMoved == 0) {
System.out.println("No block can be moved. Exiting..."); System.out.println("No block can be moved. Exiting...");
return ExitStatus.NO_MOVE_BLOCK; return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
} else { } else {
LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
" in this iteration"); " in this iteration");
} }
formatter.format("%-24s %10d %19s %18s %17s%n",
DateFormat.getDateTimeInstance().format(new Date()),
iteration,
StringUtils.byteDesc(dispatcher.getBytesMoved()),
StringUtils.byteDesc(bytesLeftToMove),
StringUtils.byteDesc(bytesToMove)
);
/* For each pair of <source, target>, start a thread that repeatedly /* For each pair of <source, target>, start a thread that repeatedly
* decide a block to be moved and its proxy source, * decide a block to be moved and its proxy source,
* then initiates the move until all bytes are moved or no more block * then initiates the move until all bytes are moved or no more block
@ -511,19 +533,19 @@ public class Balancer {
* Exit no byte has been moved for 5 consecutive iterations. * Exit no byte has been moved for 5 consecutive iterations.
*/ */
if (!dispatcher.dispatchAndCheckContinue()) { if (!dispatcher.dispatchAndCheckContinue()) {
return ExitStatus.NO_MOVE_PROGRESS; return newResult(ExitStatus.NO_MOVE_PROGRESS, bytesLeftToMove, bytesBeingMoved);
} }
return ExitStatus.IN_PROGRESS; return newResult(ExitStatus.IN_PROGRESS, bytesLeftToMove, bytesBeingMoved);
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS; return newResult(ExitStatus.ILLEGAL_ARGUMENTS);
} catch (IOException e) { } catch (IOException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ExitStatus.IO_EXCEPTION; return newResult(ExitStatus.IO_EXCEPTION);
} catch (InterruptedException e) { } catch (InterruptedException e) {
System.out.println(e + ". Exiting ..."); System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED; return newResult(ExitStatus.INTERRUPTED);
} finally { } finally {
dispatcher.shutdownNow(); dispatcher.shutdownNow();
} }
@ -545,7 +567,6 @@ public class Balancer {
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p); LOG.info("parameters = " + p);
final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
@ -559,14 +580,16 @@ public class Balancer {
Collections.shuffle(connectors); Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final ExitStatus r = b.run(iteration, formatter, conf); final Result r = b.runOneIteration();
r.print(iteration, System.out);
// clean all lists // clean all lists
b.resetData(conf); b.resetData(conf);
if (r == ExitStatus.IN_PROGRESS) { if (r.exitStatus == ExitStatus.IN_PROGRESS) {
done = false; done = false;
} else if (r != ExitStatus.SUCCESS) { } else if (r.exitStatus != ExitStatus.SUCCESS) {
//must be an error statue, return. //must be an error statue, return.
return r.getExitCode(); return r.exitStatus.getExitCode();
} }
} }

View File

@ -41,9 +41,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -76,6 +74,7 @@ import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
/** Dispatching block replica moves between datanodes. */ /** Dispatching block replica moves between datanodes. */
@ -119,8 +118,6 @@ public class Dispatcher {
/** The maximum number of concurrent blocks moves at a datanode */ /** The maximum number of concurrent blocks moves at a datanode */
private final int maxConcurrentMovesPerNode; private final int maxConcurrentMovesPerNode;
private final AtomicLong bytesMoved = new AtomicLong();
private static class GlobalBlockMap { private static class GlobalBlockMap {
private final Map<Block, DBlock> map = new HashMap<Block, DBlock>(); private final Map<Block, DBlock> map = new HashMap<Block, DBlock>();
@ -313,7 +310,7 @@ public class Dispatcher {
sendRequest(out, eb, accessToken); sendRequest(out, eb, accessToken);
receiveResponse(in); receiveResponse(in);
bytesMoved.addAndGet(block.getNumBytes()); nnc.getBytesMoved().addAndGet(block.getNumBytes());
LOG.info("Successfully moved " + this); LOG.info("Successfully moved " + this);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to move " + this + ": " + e.getMessage()); LOG.warn("Failed to move " + this + ": " + e.getMessage());
@ -803,7 +800,7 @@ public class Dispatcher {
} }
long getBytesMoved() { long getBytesMoved() {
return bytesMoved.get(); return nnc.getBytesMoved().get();
} }
long bytesToMove() { long bytesToMove() {
@ -889,7 +886,7 @@ public class Dispatcher {
* @return the total number of bytes successfully moved in this iteration. * @return the total number of bytes successfully moved in this iteration.
*/ */
private long dispatchBlockMoves() throws InterruptedException { private long dispatchBlockMoves() throws InterruptedException {
final long bytesLastMoved = bytesMoved.get(); final long bytesLastMoved = getBytesMoved();
final Future<?>[] futures = new Future<?>[sources.size()]; final Future<?>[] futures = new Future<?>[sources.size()];
final Iterator<Source> i = sources.iterator(); final Iterator<Source> i = sources.iterator();
@ -915,7 +912,7 @@ public class Dispatcher {
// wait for all block moving to be done // wait for all block moving to be done
waitForMoveCompletion(targets); waitForMoveCompletion(targets);
return bytesMoved.get() - bytesLastMoved; return getBytesMoved() - bytesLastMoved;
} }
/** The sleeping period before checking if block move is completed again */ /** The sleeping period before checking if block move is completed again */

View File

@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -108,6 +109,7 @@ public class NameNodeConnector implements Closeable {
private final Path idPath; private final Path idPath;
private final OutputStream out; private final OutputStream out;
private final List<Path> targetPaths; private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong();
private int notChangedIterations = 0; private int notChangedIterations = 0;
@ -148,6 +150,10 @@ public class NameNodeConnector implements Closeable {
return blockpoolID; return blockpoolID;
} }
AtomicLong getBytesMoved() {
return bytesMoved;
}
/** @return blocks with locations. */ /** @return blocks with locations. */
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException { throws IOException {

View File

@ -17,7 +17,12 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_BYTES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LAZY_PERSIST_FILE_SCRUB_INTERVAL_SEC;
import static org.apache.hadoop.hdfs.StorageType.DEFAULT; import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK; import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -31,6 +36,7 @@ import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
@ -44,7 +50,15 @@ import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.*; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -53,8 +67,10 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
@ -65,8 +81,8 @@ import org.junit.Test;
* This class tests if a balancer schedules tasks correctly. * This class tests if a balancer schedules tasks correctly.
*/ */
public class TestBalancer { public class TestBalancer {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(TestBalancer.class);
"org.apache.hadoop.hdfs.TestBalancer");
static { static {
((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
} }
@ -584,7 +600,7 @@ public class TestBalancer {
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf); final int r = runBalancer(namenodes, p, conf);
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
@ -593,10 +609,63 @@ public class TestBalancer {
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} }
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
LOG.info("Rebalancing with default ctor."); LOG.info(" .");
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
} }
private static int runBalancer(Collection<URI> namenodes, final Parameters p,
Configuration conf) throws IOException, InterruptedException {
final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p);
LOG.info("Print stack trace", new Throwable());
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf);
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration();
r.print(iteration, System.out);
// clean all lists
b.resetData(conf);
if (r.exitStatus == ExitStatus.IN_PROGRESS) {
done = false;
} else if (r.exitStatus != ExitStatus.SUCCESS) {
//must be an error statue, return.
return r.exitStatus.getExitCode();
} else {
if (iteration > 0) {
assertTrue(r.bytesAlreadyMoved > 0);
}
}
}
if (!done) {
Thread.sleep(sleeptime);
}
}
} finally {
for(NameNodeConnector nnc : connectors) {
IOUtils.cleanup(LOG, nnc);
}
}
return ExitStatus.SUCCESS.getExitCode();
}
private void runBalancerCli(Configuration conf, private void runBalancerCli(Configuration conf,
long totalUsedSpace, long totalCapacity, long totalUsedSpace, long totalCapacity,
Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
@ -1118,7 +1187,6 @@ public class TestBalancer {
initConfWithRamDisk(conf); initConfWithRamDisk(conf);
final int defaultRamDiskCapacity = 10; final int defaultRamDiskCapacity = 10;
final int defaultDiskCapacity = 100;
final long ramDiskStorageLimit = final long ramDiskStorageLimit =
((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) + ((long) defaultRamDiskCapacity * DEFAULT_RAM_DISK_BLOCK_SIZE) +
(DEFAULT_RAM_DISK_BLOCK_SIZE - 1); (DEFAULT_RAM_DISK_BLOCK_SIZE - 1);