HDFS-316. Balancer should run for a configurable # of iterations (Xiaoyu Yao via aw)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
	hadoop-hdfs-project/hadoop-hdfs/src/site/apt/HDFSCommands.apt.vm
This commit is contained in:
Allen Wittenauer 2015-02-11 08:10:34 -08:00 committed by Tsz-Wo Nicholas Sze
parent 9d8dbf8ea4
commit 730597c20c
7 changed files with 61 additions and 18 deletions

View File

@ -332,6 +332,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7790. Do not create optional fields in DFSInputStream unless they are HDFS-7790. Do not create optional fields in DFSInputStream unless they are
needed (cmccabe) needed (cmccabe)
HDFS-316. Balancer should run for a configurable # of iterations (Xiaoyu
Yao via aw)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode. HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -75,6 +75,10 @@
* start the balancer with a default threshold of 10% * start the balancer with a default threshold of 10%
* bin/ start-balancer.sh -threshold 5 * bin/ start-balancer.sh -threshold 5
* start the balancer with a threshold of 5% * start the balancer with a threshold of 5%
* bin/ start-balancer.sh -idleiterations 20
* start the balancer with maximum 20 consecutive idle iterations
* bin/ start-balancer.sh -idleiterations -1
* run the balancer with default threshold infinitely
* To stop: * To stop:
* bin/ stop-balancer.sh * bin/ stop-balancer.sh
* </pre> * </pre>
@ -137,7 +141,7 @@
* <ol> * <ol>
* <li>The cluster is balanced; * <li>The cluster is balanced;
* <li>No block can be moved; * <li>No block can be moved;
* <li>No block has been moved for five consecutive iterations; * <li>No block has been moved for specified consecutive iterations (5 by default);
* <li>An IOException occurs while communicating with the namenode; * <li>An IOException occurs while communicating with the namenode;
* <li>Another balancer is running. * <li>Another balancer is running.
* </ol> * </ol>
@ -148,7 +152,7 @@
* <ol> * <ol>
* <li>The cluster is balanced. Exiting * <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting... * <li>No block can be moved. Exiting...
* <li>No block has been moved for 5 iterations. Exiting... * <li>No block has been moved for specified iterations (5 by default). Exiting...
* <li>Received an IO exception: failure reason. Exiting... * <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting... * <li>Another balancer is running. Exiting...
* </ol> * </ol>
@ -176,7 +180,9 @@ public class Balancer {
+ "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]" + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
+ "\tExcludes the specified datanodes." + "\tExcludes the specified datanodes."
+ "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]" + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
+ "\tIncludes only the specified datanodes."; + "\tIncludes only the specified datanodes."
+ "\n\t[-idleiterations <idleiterations>]"
+ "\tNumber of consecutive idle iterations (-1 for Infinite) before exit.";
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final BalancingPolicy policy; private final BalancingPolicy policy;
@ -573,7 +579,7 @@ static int run(Collection<URI> namenodes, final Parameters p,
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf); Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration);
boolean done = false; boolean done = false;
for(int iteration = 0; !done; iteration++) { for(int iteration = 0; !done; iteration++) {
@ -629,19 +635,22 @@ private static String time2Str(long elapsedTime) {
static class Parameters { static class Parameters {
static final Parameters DEFAULT = new Parameters( static final Parameters DEFAULT = new Parameters(
BalancingPolicy.Node.INSTANCE, 10.0, BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet()); Collections.<String> emptySet(), Collections.<String> emptySet());
final BalancingPolicy policy; final BalancingPolicy policy;
final double threshold; final double threshold;
final int maxIdleIteration;
// exclude the nodes in this set from balancing operations // exclude the nodes in this set from balancing operations
Set<String> nodesToBeExcluded; Set<String> nodesToBeExcluded;
//include only these nodes in balancing operations //include only these nodes in balancing operations
Set<String> nodesToBeIncluded; Set<String> nodesToBeIncluded;
Parameters(BalancingPolicy policy, double threshold, Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
this.policy = policy; this.policy = policy;
this.threshold = threshold; this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration;
this.nodesToBeExcluded = nodesToBeExcluded; this.nodesToBeExcluded = nodesToBeExcluded;
this.nodesToBeIncluded = nodesToBeIncluded; this.nodesToBeIncluded = nodesToBeIncluded;
} }
@ -650,6 +659,7 @@ static class Parameters {
public String toString() { public String toString() {
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
+ "[" + policy + ", threshold=" + threshold + + "[" + policy + ", threshold=" + threshold +
", max idle iteration = " + maxIdleIteration +
", number of nodes to be excluded = "+ nodesToBeExcluded.size() + ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
} }
@ -688,6 +698,7 @@ public int run(String[] args) {
static Parameters parse(String[] args) { static Parameters parse(String[] args) {
BalancingPolicy policy = Parameters.DEFAULT.policy; BalancingPolicy policy = Parameters.DEFAULT.policy;
double threshold = Parameters.DEFAULT.threshold; double threshold = Parameters.DEFAULT.threshold;
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
@ -743,6 +754,11 @@ static Parameters parse(String[] args) {
} else { } else {
nodesTobeIncluded = Util.parseHostList(args[i]); nodesTobeIncluded = Util.parseHostList(args[i]);
} }
} else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays.toString(args));
maxIdleIteration = Integer.parseInt(args[i]);
LOG.info("Using a idleiterations of " + maxIdleIteration);
} else { } else {
throw new IllegalArgumentException("args = " throw new IllegalArgumentException("args = "
+ Arrays.toString(args)); + Arrays.toString(args));
@ -756,7 +772,7 @@ static Parameters parse(String[] args) {
} }
} }
return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded); return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded);
} }
private static void printUsage(PrintStream out) { private static void printUsage(PrintStream out) {

View File

@ -61,18 +61,18 @@
public class NameNodeConnector implements Closeable { public class NameNodeConnector implements Closeable {
private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
private static final int MAX_NOT_CHANGED_ITERATIONS = 5; public static final int DEFAULT_MAX_IDLE_ITERATIONS = 5;
private static boolean write2IdFile = true; private static boolean write2IdFile = true;
/** Create {@link NameNodeConnector} for the given namenodes. */ /** Create {@link NameNodeConnector} for the given namenodes. */
public static List<NameNodeConnector> newNameNodeConnectors( public static List<NameNodeConnector> newNameNodeConnectors(
Collection<URI> namenodes, String name, Path idPath, Configuration conf) Collection<URI> namenodes, String name, Path idPath, Configuration conf,
throws IOException { int maxIdleIterations) throws IOException {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>( final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size()); namenodes.size());
for (URI uri : namenodes) { for (URI uri : namenodes) {
NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath,
null, conf); null, conf, maxIdleIterations);
nnc.getKeyManager().startBlockKeyUpdater(); nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc); connectors.add(nnc);
} }
@ -81,12 +81,12 @@ public static List<NameNodeConnector> newNameNodeConnectors(
public static List<NameNodeConnector> newNameNodeConnectors( public static List<NameNodeConnector> newNameNodeConnectors(
Map<URI, List<Path>> namenodes, String name, Path idPath, Map<URI, List<Path>> namenodes, String name, Path idPath,
Configuration conf) throws IOException { Configuration conf, int maxIdleIterations) throws IOException {
final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>( final List<NameNodeConnector> connectors = new ArrayList<NameNodeConnector>(
namenodes.size()); namenodes.size());
for (Map.Entry<URI, List<Path>> entry : namenodes.entrySet()) { for (Map.Entry<URI, List<Path>> entry : namenodes.entrySet()) {
NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(), NameNodeConnector nnc = new NameNodeConnector(name, entry.getKey(),
idPath, entry.getValue(), conf); idPath, entry.getValue(), conf, maxIdleIterations);
nnc.getKeyManager().startBlockKeyUpdater(); nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc); connectors.add(nnc);
} }
@ -112,15 +112,18 @@ public static void setWrite2IdFile(boolean write2IdFile) {
private final List<Path> targetPaths; private final List<Path> targetPaths;
private final AtomicLong bytesMoved = new AtomicLong(); private final AtomicLong bytesMoved = new AtomicLong();
private final int maxNotChangedIterations;
private int notChangedIterations = 0; private int notChangedIterations = 0;
public NameNodeConnector(String name, URI nameNodeUri, Path idPath, public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
List<Path> targetPaths, Configuration conf) List<Path> targetPaths, Configuration conf,
int maxNotChangedIterations)
throws IOException { throws IOException {
this.nameNodeUri = nameNodeUri; this.nameNodeUri = nameNodeUri;
this.idPath = idPath; this.idPath = idPath;
this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays
.asList(new Path("/")) : targetPaths; .asList(new Path("/")) : targetPaths;
this.maxNotChangedIterations = maxNotChangedIterations;
this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NamenodeProtocol.class).getProxy(); NamenodeProtocol.class).getProxy();
@ -183,7 +186,14 @@ public boolean shouldContinue(long dispatchBlockMoveBytes) {
notChangedIterations = 0; notChangedIterations = 0;
} else { } else {
notChangedIterations++; notChangedIterations++;
if (notChangedIterations >= MAX_NOT_CHANGED_ITERATIONS) { if (LOG.isDebugEnabled()) {
LOG.debug("No block has been moved for " +
notChangedIterations + " iterations, " +
"maximum notChangedIterations before exit is: " +
((maxNotChangedIterations >= 0) ? maxNotChangedIterations : "Infinite"));
}
if ((maxNotChangedIterations >= 0) &&
(notChangedIterations >= maxNotChangedIterations)) {
System.out.println("No block has been moved for " System.out.println("No block has been moved for "
+ notChangedIterations + " iterations. Exiting..."); + notChangedIterations + " iterations. Exiting...");
return false; return false;

View File

@ -530,7 +530,8 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Mover.class.getSimpleName(), MOVER_ID_PATH, conf); Mover.class.getSimpleName(), MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
while (connectors.size() > 0) { while (connectors.size() > 0) {
Collections.shuffle(connectors); Collections.shuffle(connectors);

View File

@ -134,7 +134,7 @@ HDFS Commands Guide
to stop the rebalancing process. See to stop the rebalancing process. See
{{{./HdfsUserGuide.html#Balancer}Balancer}} for more details. {{{./HdfsUserGuide.html#Balancer}Balancer}} for more details.
Usage: <<<hdfs balancer [-threshold <threshold>] [-policy <policy>]>>> Usage: <<<hdfs balancer [-threshold <threshold>] [-policy <policy>] [-idleiterations <idleiterations>]>>>
*------------------------+----------------------------------------------------+ *------------------------+----------------------------------------------------+
|| COMMAND_OPTION | Description || COMMAND_OPTION | Description
@ -146,6 +146,9 @@ HDFS Commands Guide
| | each datanode is balanced. \ | | each datanode is balanced. \
| | <<<blockpool>>>: Cluster is balanced if each block | | <<<blockpool>>>: Cluster is balanced if each block
| | pool in each datanode is balanced. | | pool in each datanode is balanced.
*------------------------+----------------------------------------------------+
| -idleiterations <iterations> | Maximum number of idle iterations before exit.
| | This overwrites the default idleiterations(5).
*------------------------+----------------------------------------------------+ *------------------------+----------------------------------------------------+
Note that the <<<blockpool>>> policy is more strict than the <<<datanode>>> Note that the <<<blockpool>>> policy is more strict than the <<<datanode>>>

View File

@ -613,6 +613,7 @@ private void doTest(Configuration conf, long[] capacities,
p = new Balancer.Parameters( p = new Balancer.Parameters(
Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
} }
@ -678,7 +679,8 @@ private static int runBalancer(Collection<URI> namenodes, final Parameters p,
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf); Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
Balancer.Parameters.DEFAULT.maxIdleIteration);
boolean done = false; boolean done = false;
for(int iteration = 0; !done; iteration++) { for(int iteration = 0; !done; iteration++) {
@ -850,6 +852,7 @@ public void testUnknownDatanode() throws Exception {
Balancer.Parameters p = new Balancer.Parameters( Balancer.Parameters p = new Balancer.Parameters(
Balancer.Parameters.DEFAULT.policy, Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold, Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded); datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
@ -1284,6 +1287,7 @@ public void testBalancerWithRamDisk() throws Exception {
Balancer.Parameters p = new Balancer.Parameters( Balancer.Parameters p = new Balancer.Parameters(
Parameters.DEFAULT.policy, Parameters.DEFAULT.policy,
Parameters.DEFAULT.threshold, Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
Parameters.DEFAULT.nodesToBeExcluded, Parameters.DEFAULT.nodesToBeExcluded,
Parameters.DEFAULT.nodesToBeIncluded); Parameters.DEFAULT.nodesToBeIncluded);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);

View File

@ -21,6 +21,7 @@
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -39,9 +40,14 @@ public class TestMover {
static Mover newMover(Configuration conf) throws IOException { static Mover newMover(Configuration conf) throws IOException {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Assert.assertEquals(1, namenodes.size()); Assert.assertEquals(1, namenodes.size());
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors( final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
namenodes, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf); nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return new Mover(nncs.get(0), conf); return new Mover(nncs.get(0), conf);
} }