HDFS-9008. Balancer#Parameters class could use a builder pattern. (Chris Trezzo via mingma)

This commit is contained in:
Ming Ma 2015-09-15 10:16:02 -07:00
parent 73e3a49eb0
commit 083b44c136
7 changed files with 317 additions and 202 deletions

View File

@ -915,6 +915,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9065. Include commas on # of files, blocks, total filesystem objects HDFS-9065. Include commas on # of files, blocks, total filesystem objects
in NN Web UI. (Daniel Templeton via wheat9) in NN Web UI. (Daniel Templeton via wheat9)
HDFS-9008. Balancer#Parameters class could use a builder pattern.
(Chris Trezzo via mingma)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -243,7 +243,8 @@ public class Balancer {
* namenode as a client and a secondary namenode and retry proxies * namenode as a client and a secondary namenode and retry proxies
* when connection fails. * when connection fails.
*/ */
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { Balancer(NameNodeConnector theblockpool, BalancerParameters p,
Configuration conf) {
final long movedWinWidth = getLong(conf, final long movedWinWidth = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
@ -265,13 +266,15 @@ public class Balancer {
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
this.nnc = theblockpool; this.nnc = theblockpool;
this.dispatcher = new Dispatcher(theblockpool, p.includedNodes, this.dispatcher =
p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, new Dispatcher(theblockpool, p.getIncludedNodes(),
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); p.getExcludedNodes(), movedWinWidth, moverThreads,
this.threshold = p.threshold; dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
this.policy = p.policy; getBlocksMinBlockSize, conf);
this.sourceNodes = p.sourceNodes; this.threshold = p.getThreshold();
this.runDuringUpgrade = p.runDuringUpgrade; this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.maxSizeToMove = getLong(conf, this.maxSizeToMove = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
@ -629,7 +632,7 @@ public class Balancer {
* for each namenode, * for each namenode,
* execute a {@link Balancer} to work through all datanodes once. * execute a {@link Balancer} to work through all datanodes once.
*/ */
static int run(Collection<URI> namenodes, final Parameters p, static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException { Configuration conf) throws IOException, InterruptedException {
final long sleeptime = final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -638,24 +641,25 @@ public class Balancer {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p); LOG.info("parameters = " + p);
LOG.info("included nodes = " + p.includedNodes); LOG.info("included nodes = " + p.getIncludedNodes());
LOG.info("excluded nodes = " + p.excludedNodes); LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.sourceNodes); LOG.info("source nodes = " + p.getSourceNodes());
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
try { try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration); Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
p.getMaxIdleIteration());
boolean done = false; boolean done = false;
for(int iteration = 0; !done; iteration++) { for(int iteration = 0; !done; iteration++) {
done = true; done = true;
Collections.shuffle(connectors); Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
if (p.blockpools.size() == 0 if (p.getBlockPools().size() == 0
|| p.blockpools.contains(nnc.getBlockpoolID())) { || p.getBlockPools().contains(nnc.getBlockpoolID())) {
final Balancer b = new Balancer(nnc, p, conf); final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration(); final Result r = b.runOneIteration();
r.print(iteration, System.out); r.print(iteration, System.out);
@ -705,65 +709,6 @@ public class Balancer {
return time+" "+unit; return time+" "+unit;
} }
static class Parameters {
static final Parameters DEFAULT =
new Parameters(BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
Collections.<String> emptySet(), Collections.<String> emptySet(),
false);
final BalancingPolicy policy;
final double threshold;
final int maxIdleIteration;
/** Exclude the nodes in this set. */
final Set<String> excludedNodes;
/** If empty, include any node; otherwise, include only these nodes. */
final Set<String> includedNodes;
/** If empty, any node can be a source;
* otherwise, use only these nodes as source nodes.
*/
final Set<String> sourceNodes;
/**
* A set of block pools to run the balancer on.
*/
final Set<String> blockpools;
/**
* Whether to run the balancer during upgrade.
*/
final boolean runDuringUpgrade;
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> excludedNodes, Set<String> includedNodes,
Set<String> sourceNodes, Set<String> blockpools,
boolean runDuringUpgrade) {
this.policy = policy;
this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
this.sourceNodes = sourceNodes;
this.blockpools = blockpools;
this.runDuringUpgrade = runDuringUpgrade;
}
@Override
public String toString() {
return String.format("%s.%s [%s,"
+ " threshold = %s,"
+ " max idle iteration = %s,"
+ " #excluded nodes = %s,"
+ " #included nodes = %s,"
+ " #source nodes = %s,"
+ " #blockpools = %s,"
+ " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade);
}
}
static class Cli extends Configured implements Tool { static class Cli extends Configured implements Tool {
/** /**
* Parse arguments and then run Balancer. * Parse arguments and then run Balancer.
@ -796,15 +741,10 @@ public class Balancer {
} }
/** parse command line arguments */ /** parse command line arguments */
static Parameters parse(String[] args) { static BalancerParameters parse(String[] args) {
BalancingPolicy policy = Parameters.DEFAULT.policy; Set<String> excludedNodes = null;
double threshold = Parameters.DEFAULT.threshold; Set<String> includedNodes = null;
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; BalancerParameters.Builder b = new BalancerParameters.Builder();
Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
Set<String> blockpools = Parameters.DEFAULT.blockpools;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
if (args != null) { if (args != null) {
try { try {
@ -813,12 +753,13 @@ public class Balancer {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"Threshold value is missing: args = " + Arrays.toString(args)); "Threshold value is missing: args = " + Arrays.toString(args));
try { try {
threshold = Double.parseDouble(args[i]); double threshold = Double.parseDouble(args[i]);
if (threshold < 1 || threshold > 100) { if (threshold < 1 || threshold > 100) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Number out of range: threshold = " + threshold); "Number out of range: threshold = " + threshold);
} }
LOG.info( "Using a threshold of " + threshold ); LOG.info( "Using a threshold of " + threshold );
b.setThreshold(threshold);
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
System.err.println( System.err.println(
"Expecting a number in the range of [1.0, 100.0]: " "Expecting a number in the range of [1.0, 100.0]: "
@ -829,7 +770,7 @@ public class Balancer {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"Policy value is missing: args = " + Arrays.toString(args)); "Policy value is missing: args = " + Arrays.toString(args));
try { try {
policy = BalancingPolicy.parse(args[i]); b.setBalancingPolicy(BalancingPolicy.parse(args[i]));
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
System.err.println("Illegal policy name: " + args[i]); System.err.println("Illegal policy name: " + args[i]);
throw e; throw e;
@ -837,28 +778,33 @@ public class Balancer {
} else if ("-exclude".equalsIgnoreCase(args[i])) { } else if ("-exclude".equalsIgnoreCase(args[i])) {
excludedNodes = new HashSet<>(); excludedNodes = new HashSet<>();
i = processHostList(args, i, "exclude", excludedNodes); i = processHostList(args, i, "exclude", excludedNodes);
b.setExcludedNodes(excludedNodes);
} else if ("-include".equalsIgnoreCase(args[i])) { } else if ("-include".equalsIgnoreCase(args[i])) {
includedNodes = new HashSet<>(); includedNodes = new HashSet<>();
i = processHostList(args, i, "include", includedNodes); i = processHostList(args, i, "include", includedNodes);
b.setIncludedNodes(includedNodes);
} else if ("-source".equalsIgnoreCase(args[i])) { } else if ("-source".equalsIgnoreCase(args[i])) {
sourceNodes = new HashSet<>(); Set<String> sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes); i = processHostList(args, i, "source", sourceNodes);
b.setSourceNodes(sourceNodes);
} else if ("-blockpools".equalsIgnoreCase(args[i])) { } else if ("-blockpools".equalsIgnoreCase(args[i])) {
checkArgument( checkArgument(
++i < args.length, ++i < args.length,
"blockpools value is missing: args = " "blockpools value is missing: args = "
+ Arrays.toString(args)); + Arrays.toString(args));
blockpools = parseBlockPoolList(args[i]); Set<String> blockpools = parseBlockPoolList(args[i]);
LOG.info("Balancer will run on the following blockpools: " LOG.info("Balancer will run on the following blockpools: "
+ blockpools.toString()); + blockpools.toString());
b.setBlockpools(blockpools);
} else if ("-idleiterations".equalsIgnoreCase(args[i])) { } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays "idleiterations value is missing: args = " + Arrays
.toString(args)); .toString(args));
maxIdleIteration = Integer.parseInt(args[i]); int maxIdleIteration = Integer.parseInt(args[i]);
LOG.info("Using a idleiterations of " + maxIdleIteration); LOG.info("Using a idleiterations of " + maxIdleIteration);
b.setMaxIdleIteration(maxIdleIteration);
} else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) { } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
runDuringUpgrade = true; b.setRunDuringUpgrade(true);
LOG.info("Will run the balancer even during an ongoing HDFS " LOG.info("Will run the balancer even during an ongoing HDFS "
+ "upgrade. Most users will not want to run the balancer " + "upgrade. Most users will not want to run the balancer "
+ "during an upgrade since it will not affect used space " + "during an upgrade since it will not affect used space "
@ -868,16 +814,14 @@ public class Balancer {
+ Arrays.toString(args)); + Arrays.toString(args));
} }
} }
checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(), checkArgument(excludedNodes == null || includedNodes == null,
"-exclude and -include options cannot be specified together."); "-exclude and -include options cannot be specified together.");
} catch(RuntimeException e) { } catch(RuntimeException e) {
printUsage(System.err); printUsage(System.err);
throw e; throw e;
} }
} }
return b.build();
return new Parameters(policy, threshold, maxIdleIteration, excludedNodes,
includedNodes, sourceNodes, blockpools, runDuringUpgrade);
} }
private static int processHostList(String[] args, int i, String type, private static int processHostList(String[] args, int i, String type,

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
final class BalancerParameters {
private final BalancingPolicy policy;
private final double threshold;
private final int maxIdleIteration;
/** Exclude the nodes in this set. */
private final Set<String> excludedNodes;
/** If empty, include any node; otherwise, include only these nodes. */
private final Set<String> includedNodes;
/**
* If empty, any node can be a source; otherwise, use only these nodes as
* source nodes.
*/
private final Set<String> sourceNodes;
/**
* A set of block pools to run the balancer on.
*/
private final Set<String> blockpools;
/**
* Whether to run the balancer during upgrade.
*/
private final boolean runDuringUpgrade;
static final BalancerParameters DEFAULT = new BalancerParameters();
private BalancerParameters() {
this(new Builder());
}
private BalancerParameters(Builder builder) {
this.policy = builder.policy;
this.threshold = builder.threshold;
this.maxIdleIteration = builder.maxIdleIteration;
this.excludedNodes = builder.excludedNodes;
this.includedNodes = builder.includedNodes;
this.sourceNodes = builder.sourceNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
}
BalancingPolicy getBalancingPolicy() {
return this.policy;
}
double getThreshold() {
return this.threshold;
}
int getMaxIdleIteration() {
return this.maxIdleIteration;
}
Set<String> getExcludedNodes() {
return this.excludedNodes;
}
Set<String> getIncludedNodes() {
return this.includedNodes;
}
Set<String> getSourceNodes() {
return this.sourceNodes;
}
Set<String> getBlockPools() {
return this.blockpools;
}
boolean getRunDuringUpgrade() {
return this.runDuringUpgrade;
}
@Override
public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
+ " #blockpools = %s," + " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade);
}
static class Builder {
// Defaults
private BalancingPolicy policy = BalancingPolicy.Node.INSTANCE;
private double threshold = 10.0;
private int maxIdleIteration =
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS;
private Set<String> excludedNodes = Collections.<String> emptySet();
private Set<String> includedNodes = Collections.<String> emptySet();
private Set<String> sourceNodes = Collections.<String> emptySet();
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
Builder() {
}
Builder setBalancingPolicy(BalancingPolicy p) {
this.policy = p;
return this;
}
Builder setThreshold(double t) {
this.threshold = t;
return this;
}
Builder setMaxIdleIteration(int m) {
this.maxIdleIteration = m;
return this;
}
Builder setExcludedNodes(Set<String> nodes) {
this.excludedNodes = nodes;
return this;
}
Builder setIncludedNodes(Set<String> nodes) {
this.includedNodes = nodes;
return this;
}
Builder setSourceNodes(Set<String> nodes) {
this.sourceNodes = nodes;
return this;
}
Builder setBlockpools(Set<String> pools) {
this.blockpools = pools;
return this;
}
Builder setRunDuringUpgrade(boolean run) {
this.runDuringUpgrade = run;
return this;
}
BalancerParameters build() {
return new BalancerParameters(this);
}
}
}

View File

@ -75,8 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@ -319,7 +319,7 @@ public class TestBalancer {
* @throws TimeoutException * @throws TimeoutException
*/ */
static void waitForBalancer(long totalUsedSpace, long totalCapacity, static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p)
throws IOException, TimeoutException { throws IOException, TimeoutException {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
} }
@ -377,7 +377,7 @@ public class TestBalancer {
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} finally { } finally {
@ -393,16 +393,16 @@ public class TestBalancer {
* @throws TimeoutException * @throws TimeoutException
*/ */
static void waitForBalancer(long totalUsedSpace, long totalCapacity, static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes) throws IOException, TimeoutException { int expectedExcludedNodes) throws IOException, TimeoutException {
long timeout = TIMEOUT; long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.monotonicNow() + timeout; : Time.monotonicNow() + timeout;
if (!p.includedNodes.isEmpty()) { if (!p.getIncludedNodes().isEmpty()) {
totalCapacity = p.includedNodes.size() * CAPACITY; totalCapacity = p.getIncludedNodes().size() * CAPACITY;
} }
if (!p.excludedNodes.isEmpty()) { if (!p.getExcludedNodes().isEmpty()) {
totalCapacity -= p.excludedNodes.size() * CAPACITY; totalCapacity -= p.getExcludedNodes().size() * CAPACITY;
} }
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced; boolean balanced;
@ -415,12 +415,12 @@ public class TestBalancer {
for (DatanodeInfo datanode : datanodeReport) { for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed()) double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity(); / datanode.getCapacity();
if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) { if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0); assertTrue(nodeUtilization == 0);
actualExcludedNodeCount++; actualExcludedNodeCount++;
continue; continue;
} }
if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) { if (!Dispatcher.Util.isIncluded(p.getIncludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0); assertTrue(nodeUtilization == 0);
actualExcludedNodeCount++; actualExcludedNodeCount++;
continue; continue;
@ -636,16 +636,14 @@ public class TestBalancer {
} }
} }
// run balancer and validate results // run balancer and validate results
Balancer.Parameters p = Balancer.Parameters.DEFAULT; BalancerParameters.Builder pBuilder =
new BalancerParameters.Builder();
if (nodes != null) { if (nodes != null) {
p = new Balancer.Parameters( pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded());
Balancer.Parameters.DEFAULT.policy, pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded());
Balancer.Parameters.DEFAULT.threshold, pBuilder.setRunDuringUpgrade(false);
Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
Balancer.Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, false);
} }
BalancerParameters p = pBuilder.build();
int expectedExcludedNodes = 0; int expectedExcludedNodes = 0;
if (nodes != null) { if (nodes != null) {
@ -668,14 +666,15 @@ public class TestBalancer {
} }
} }
private void runBalancer(Configuration conf, private void runBalancer(Configuration conf, long totalUsedSpace,
long totalUsedSpace, long totalCapacity) throws Exception { long totalCapacity) throws Exception {
runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); runBalancer(conf, totalUsedSpace, totalCapacity,
BalancerParameters.DEFAULT, 0);
} }
private void runBalancer(Configuration conf, private void runBalancer(Configuration conf, long totalUsedSpace,
long totalUsedSpace, long totalCapacity, Balancer.Parameters p, long totalCapacity, BalancerParameters p, int excludedNodes)
int excludedNodes) throws Exception { throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing // start rebalancing
@ -693,7 +692,8 @@ public class TestBalancer {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
} }
private static int runBalancer(Collection<URI> namenodes, final Parameters p, private static int runBalancer(Collection<URI> namenodes,
final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException { Configuration conf) throws IOException, InterruptedException {
final long sleeptime = final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -710,7 +710,7 @@ public class TestBalancer {
try { try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes, connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
Balancer.Parameters.DEFAULT.maxIdleIteration); BalancerParameters.DEFAULT.getMaxIdleIteration());
boolean done = false; boolean done = false;
for(int iteration = 0; !done; iteration++) { for(int iteration = 0; !done; iteration++) {
@ -747,45 +747,45 @@ public class TestBalancer {
return ExitStatus.SUCCESS.getExitCode(); return ExitStatus.SUCCESS.getExitCode();
} }
private void runBalancerCli(Configuration conf, private void runBalancerCli(Configuration conf, long totalUsedSpace,
long totalUsedSpace, long totalCapacity, long totalCapacity, BalancerParameters p, boolean useFile,
Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { int expectedExcludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
List <String> args = new ArrayList<String>(); List <String> args = new ArrayList<String>();
args.add("-policy"); args.add("-policy");
args.add("datanode"); args.add("datanode");
File excludeHostsFile = null; File excludeHostsFile = null;
if (!p.excludedNodes.isEmpty()) { if (!p.getExcludedNodes().isEmpty()) {
args.add("-exclude"); args.add("-exclude");
if (useFile) { if (useFile) {
excludeHostsFile = new File ("exclude-hosts-file"); excludeHostsFile = new File ("exclude-hosts-file");
PrintWriter pw = new PrintWriter(excludeHostsFile); PrintWriter pw = new PrintWriter(excludeHostsFile);
for (String host: p.excludedNodes) { for (String host : p.getExcludedNodes()) {
pw.write( host + "\n"); pw.write( host + "\n");
} }
pw.close(); pw.close();
args.add("-f"); args.add("-f");
args.add("exclude-hosts-file"); args.add("exclude-hosts-file");
} else { } else {
args.add(StringUtils.join(p.excludedNodes, ',')); args.add(StringUtils.join(p.getExcludedNodes(), ','));
} }
} }
File includeHostsFile = null; File includeHostsFile = null;
if (!p.includedNodes.isEmpty()) { if (!p.getIncludedNodes().isEmpty()) {
args.add("-include"); args.add("-include");
if (useFile) { if (useFile) {
includeHostsFile = new File ("include-hosts-file"); includeHostsFile = new File ("include-hosts-file");
PrintWriter pw = new PrintWriter(includeHostsFile); PrintWriter pw = new PrintWriter(includeHostsFile);
for (String host: p.includedNodes){ for (String host : p.getIncludedNodes()) {
pw.write( host + "\n"); pw.write( host + "\n");
} }
pw.close(); pw.close();
args.add("-f"); args.add("-f");
args.add("include-hosts-file"); args.add("include-hosts-file");
} else { } else {
args.add(StringUtils.join(p.includedNodes, ',')); args.add(StringUtils.join(p.getIncludedNodes(), ','));
} }
} }
@ -879,14 +879,11 @@ public class TestBalancer {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Set<String> datanodes = new HashSet<String>(); Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
Balancer.Parameters p = new Balancer.Parameters( BalancerParameters.Builder pBuilder =
Balancer.Parameters.DEFAULT.policy, new BalancerParameters.Builder();
Balancer.Parameters.DEFAULT.threshold, pBuilder.setExcludedNodes(datanodes);
Balancer.Parameters.DEFAULT.maxIdleIteration, pBuilder.setRunDuringUpgrade(false);
datanodes, Balancer.Parameters.DEFAULT.includedNodes, final int r = Balancer.run(namenodes, pBuilder.build(), conf);
Balancer.Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, false);
final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
@ -1081,20 +1078,20 @@ public class TestBalancer {
@Test @Test
public void testBalancerCliParseBlockpools() { public void testBalancerCliParseBlockpools() {
String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" }; String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" };
Balancer.Parameters p = Balancer.Cli.parse(parameters); BalancerParameters p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size()); assertEquals(3, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1" }; parameters = new String[] { "-blockpools", "bp-1" };
p = Balancer.Cli.parse(parameters); p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size()); assertEquals(1, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1,,bp-2" }; parameters = new String[] { "-blockpools", "bp-1,,bp-2" };
p = Balancer.Cli.parse(parameters); p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size()); assertEquals(3, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1," }; parameters = new String[] { "-blockpools", "bp-1," };
p = Balancer.Cli.parse(parameters); p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size()); assertEquals(1, p.getBlockPools().size());
} }
/** /**
@ -1123,7 +1120,8 @@ public class TestBalancer {
excludeHosts.add( "datanodeZ"); excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
excludeHosts, Parameters.DEFAULT.includedNodes), false, false); excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()),
false, false);
} }
/** /**
@ -1151,9 +1149,11 @@ public class TestBalancer {
Set<String> excludeHosts = new HashSet<String>(); Set<String> excludeHosts = new HashSet<String>();
excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeY");
excludeHosts.add( "datanodeZ"); excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[] { CAPACITY, CAPACITY },
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, new String[] { RACK0, RACK1 }, CAPACITY, RACK2, new HostNameBasedNodes(
Parameters.DEFAULT.includedNodes), true, false); new String[] { "datanodeX", "datanodeY", "datanodeZ" },
excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true,
false);
} }
/** /**
@ -1183,7 +1183,8 @@ public class TestBalancer {
excludeHosts.add( "datanodeZ"); excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
excludeHosts, Parameters.DEFAULT.includedNodes), true, true); excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true,
true);
} }
/** /**
@ -1212,7 +1213,8 @@ public class TestBalancer {
includeHosts.add( "datanodeY"); includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), false, false); BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts),
false, false);
} }
/** /**
@ -1241,7 +1243,8 @@ public class TestBalancer {
includeHosts.add( "datanodeY"); includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), true, false); BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true,
false);
} }
/** /**
@ -1270,7 +1273,8 @@ public class TestBalancer {
includeHosts.add( "datanodeY"); includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), true, true); BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true,
true);
} }
/** /**
@ -1343,7 +1347,7 @@ public class TestBalancer {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run Balancer // Run Balancer
final Balancer.Parameters p = Parameters.DEFAULT; final BalancerParameters p = BalancerParameters.DEFAULT;
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
// Validate no RAM_DISK block should be moved // Validate no RAM_DISK block should be moved
@ -1395,7 +1399,7 @@ public class TestBalancer {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run balancer // Run balancer
final Balancer.Parameters p = Parameters.DEFAULT; final BalancerParameters p = BalancerParameters.DEFAULT;
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
@ -1406,14 +1410,10 @@ public class TestBalancer {
Balancer.run(namenodes, p, conf)); Balancer.run(namenodes, p, conf));
// Should work with the -runDuringUpgrade flag. // Should work with the -runDuringUpgrade flag.
final Balancer.Parameters runDuringUpgrade = BalancerParameters.Builder b =
new Balancer.Parameters(Parameters.DEFAULT.policy, new BalancerParameters.Builder();
Parameters.DEFAULT.threshold, b.setRunDuringUpgrade(true);
Parameters.DEFAULT.maxIdleIteration, final BalancerParameters runDuringUpgrade = b.build();
Parameters.DEFAULT.excludedNodes,
Parameters.DEFAULT.includedNodes,
Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, true);
assertEquals(ExitStatus.SUCCESS.getExitCode(), assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf)); Balancer.run(namenodes, runDuringUpgrade, conf));
@ -1480,7 +1480,7 @@ public class TestBalancer {
// update space info // update space info
cluster.triggerHeartbeats(); cluster.triggerHeartbeats();
Balancer.Parameters p = Balancer.Parameters.DEFAULT; BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1612,12 +1612,11 @@ public class TestBalancer {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
{ // run Balancer with min-block-size=50 { // run Balancer with min-block-size=50
final Parameters p = new Parameters( BalancerParameters.Builder b =
BalancingPolicy.Node.INSTANCE, 1, new BalancerParameters.Builder();
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
Collections.<String> emptySet(), Collections.<String> emptySet(), b.setThreshold(1);
Collections.<String> emptySet(), final BalancerParameters p = b.build();
Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1632,11 +1631,12 @@ public class TestBalancer {
for(int i = capacities.length; i < datanodes.size(); i++) { for(int i = capacities.length; i < datanodes.size(); i++) {
sourceNodes.add(datanodes.get(i).getDisplayName()); sourceNodes.add(datanodes.get(i).getDisplayName());
} }
final Parameters p = new Parameters( BalancerParameters.Builder b =
BalancingPolicy.Node.INSTANCE, 1, new BalancerParameters.Builder();
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
Collections.<String> emptySet(), Collections.<String> emptySet(), b.setThreshold(1);
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1647,11 +1647,12 @@ public class TestBalancer {
final Set<String> sourceNodes = new HashSet<>(); final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes(); final List<DataNode> datanodes = cluster.getDataNodes();
sourceNodes.add(datanodes.get(0).getDisplayName()); sourceNodes.add(datanodes.get(0).getDisplayName());
final Parameters p = new Parameters( BalancerParameters.Builder b =
BalancingPolicy.Node.INSTANCE, 1, new BalancerParameters.Builder();
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
Collections.<String> emptySet(), Collections.<String> emptySet(), b.setThreshold(1);
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1664,11 +1665,12 @@ public class TestBalancer {
for(int i = 0; i < capacities.length; i++) { for(int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName()); sourceNodes.add(datanodes.get(i).getDisplayName());
} }
final Parameters p = new Parameters( BalancerParameters.Builder b =
BalancingPolicy.Node.INSTANCE, 1, new BalancerParameters.Builder();
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
Collections.<String> emptySet(), Collections.<String> emptySet(), b.setThreshold(1);
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);

View File

@ -97,10 +97,10 @@ public class TestBalancerWithHANameNodes {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
assertEquals(1, namenodes.size()); assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, Balancer.Parameters.DEFAULT); cluster, BalancerParameters.DEFAULT);
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.log4j.Level; import org.apache.log4j.Level;
@ -84,10 +85,10 @@ public class TestBalancerWithMultipleNameNodes {
final MiniDFSCluster cluster; final MiniDFSCluster cluster;
final ClientProtocol[] clients; final ClientProtocol[] clients;
final short replication; final short replication;
final Balancer.Parameters parameters; final BalancerParameters parameters;
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
Balancer.Parameters parameters, Configuration conf) throws IOException { BalancerParameters parameters, Configuration conf) throws IOException {
this.conf = conf; this.conf = conf;
this.cluster = cluster; this.cluster = cluster;
clients = new ClientProtocol[nNameNodes]; clients = new ClientProtocol[nNameNodes];
@ -204,7 +205,7 @@ public class TestBalancerWithMultipleNameNodes {
balanced = true; balanced = true;
for(int d = 0; d < used.length; d++) { for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d]; final double p = used[d]*100.0/cap[d];
balanced = p <= avg + s.parameters.threshold; balanced = p <= avg + s.parameters.getThreshold();
if (!balanced) { if (!balanced) {
if (i % 100 == 0) { if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: " LOG.warn("datanodes " + d + " is not yet balanced: "
@ -278,13 +279,14 @@ public class TestBalancerWithMultipleNameNodes {
DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException { DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
Map<Integer, DatanodeStorageReport[]> reports = Map<Integer, DatanodeStorageReport[]> reports =
new HashMap<Integer, DatanodeStorageReport[]>(); new HashMap<Integer, DatanodeStorageReport[]>();
if (s.parameters.blockpools.size() == 0) { if (s.parameters.getBlockPools().size() == 0) {
// the blockpools parameter was not set, so we don't need to track any // the blockpools parameter was not set, so we don't need to track any
// blockpools. // blockpools.
return Collections.emptyMap(); return Collections.emptyMap();
} }
for (int i = 0; i < s.clients.length; i++) { for (int i = 0; i < s.clients.length; i++) {
if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i) if (s.parameters.getBlockPools().contains(
s.cluster.getNamesystem(i)
.getBlockPoolId())) { .getBlockPoolId())) {
// we want to ensure that blockpools not specified by the balancer // we want to ensure that blockpools not specified by the balancer
// parameters were left alone. Therefore, if the pool was specified, // parameters were left alone. Therefore, if the pool was specified,
@ -388,14 +390,10 @@ public class TestBalancerWithMultipleNameNodes {
for (int i = 0; i < nNameNodesToBalance; i++) { for (int i = 0; i < nNameNodesToBalance; i++) {
blockpools.add(cluster.getNamesystem(i).getBlockPoolId()); blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
} }
Balancer.Parameters params = BalancerParameters.Builder b =
new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy, new BalancerParameters.Builder();
Balancer.Parameters.DEFAULT.threshold, b.setBlockpools(blockpools);
Balancer.Parameters.DEFAULT.maxIdleIteration, BalancerParameters params = b.build();
Balancer.Parameters.DEFAULT.excludedNodes,
Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes, blockpools,
Balancer.Parameters.DEFAULT.runDuringUpgrade);
final Suite s = final Suite s =
new Suite(cluster, nNameNodes, nDataNodes, params, conf); new Suite(cluster, nNameNodes, nDataNodes, params, conf);
for(int n = 0; n < nNameNodes; n++) { for(int n = 0; n < nNameNodes; n++) {
@ -455,7 +453,7 @@ public class TestBalancerWithMultipleNameNodes {
LOG.info("RUN_TEST 1"); LOG.info("RUN_TEST 1");
final Suite s = final Suite s =
new Suite(cluster, nNameNodes, nDataNodes, new Suite(cluster, nNameNodes, nDataNodes,
Balancer.Parameters.DEFAULT, conf); BalancerParameters.DEFAULT, conf);
long totalCapacity = TestBalancer.sum(capacities); long totalCapacity = TestBalancer.sum(capacities);
LOG.info("RUN_TEST 2"); LOG.info("RUN_TEST 2");

View File

@ -175,7 +175,7 @@ public class TestBalancerWithNodeGroup {
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);
@ -189,7 +189,7 @@ public class TestBalancerWithNodeGroup {
// start rebalancing // start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf); Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
waitForHeartBeat(totalUsedSpace, totalCapacity); waitForHeartBeat(totalUsedSpace, totalCapacity);