HDFS-9008. Balancer#Parameters class could use a builder pattern. (Chris Trezzo via mingma)

(cherry picked from commit 083b44c136)
This commit is contained in:
Ming Ma 2015-09-15 10:16:02 -07:00
parent df714e25aa
commit 3531823fcc
7 changed files with 317 additions and 202 deletions

View File

@ -567,6 +567,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9065. Include commas on # of files, blocks, total filesystem objects
in NN Web UI. (Daniel Templeton via wheat9)
HDFS-9008. Balancer#Parameters class could use a builder pattern.
(Chris Trezzo via mingma)
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -244,7 +244,8 @@ static int getInt(Configuration conf, String key, int defaultValue) {
* namenode as a client and a secondary namenode and retry proxies
* when connection fails.
*/
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
Balancer(NameNodeConnector theblockpool, BalancerParameters p,
Configuration conf) {
final long movedWinWidth = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
@ -266,13 +267,15 @@ static int getInt(Configuration conf, String key, int defaultValue) {
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
this.nnc = theblockpool;
this.dispatcher = new Dispatcher(theblockpool, p.includedNodes,
p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads,
maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
this.threshold = p.threshold;
this.policy = p.policy;
this.sourceNodes = p.sourceNodes;
this.runDuringUpgrade = p.runDuringUpgrade;
this.dispatcher =
new Dispatcher(theblockpool, p.getIncludedNodes(),
p.getExcludedNodes(), movedWinWidth, moverThreads,
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
getBlocksMinBlockSize, conf);
this.threshold = p.getThreshold();
this.policy = p.getBalancingPolicy();
this.sourceNodes = p.getSourceNodes();
this.runDuringUpgrade = p.getRunDuringUpgrade();
this.maxSizeToMove = getLong(conf,
DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
@ -630,7 +633,7 @@ Result runOneIteration() {
* for each namenode,
* execute a {@link Balancer} to work through all datanodes once.
*/
static int run(Collection<URI> namenodes, final Parameters p,
static int run(Collection<URI> namenodes, final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException {
final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -639,24 +642,25 @@ static int run(Collection<URI> namenodes, final Parameters p,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
LOG.info("namenodes = " + namenodes);
LOG.info("parameters = " + p);
LOG.info("included nodes = " + p.includedNodes);
LOG.info("excluded nodes = " + p.excludedNodes);
LOG.info("source nodes = " + p.sourceNodes);
LOG.info("included nodes = " + p.getIncludedNodes());
LOG.info("excluded nodes = " + p.getExcludedNodes());
LOG.info("source nodes = " + p.getSourceNodes());
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
List<NameNodeConnector> connectors = Collections.emptyList();
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration);
Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf,
p.getMaxIdleIteration());
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
if (p.blockpools.size() == 0
|| p.blockpools.contains(nnc.getBlockpoolID())) {
if (p.getBlockPools().size() == 0
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration();
r.print(iteration, System.out);
@ -706,65 +710,6 @@ private static String time2Str(long elapsedTime) {
return time+" "+unit;
}
static class Parameters {
static final Parameters DEFAULT =
new Parameters(BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
Collections.<String> emptySet(), Collections.<String> emptySet(),
false);
final BalancingPolicy policy;
final double threshold;
final int maxIdleIteration;
/** Exclude the nodes in this set. */
final Set<String> excludedNodes;
/** If empty, include any node; otherwise, include only these nodes. */
final Set<String> includedNodes;
/** If empty, any node can be a source;
* otherwise, use only these nodes as source nodes.
*/
final Set<String> sourceNodes;
/**
* A set of block pools to run the balancer on.
*/
final Set<String> blockpools;
/**
* Whether to run the balancer during upgrade.
*/
final boolean runDuringUpgrade;
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> excludedNodes, Set<String> includedNodes,
Set<String> sourceNodes, Set<String> blockpools,
boolean runDuringUpgrade) {
this.policy = policy;
this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration;
this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes;
this.sourceNodes = sourceNodes;
this.blockpools = blockpools;
this.runDuringUpgrade = runDuringUpgrade;
}
@Override
public String toString() {
return String.format("%s.%s [%s,"
+ " threshold = %s,"
+ " max idle iteration = %s,"
+ " #excluded nodes = %s,"
+ " #included nodes = %s,"
+ " #source nodes = %s,"
+ " #blockpools = %s,"
+ " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade);
}
}
static class Cli extends Configured implements Tool {
/**
* Parse arguments and then run Balancer.
@ -797,15 +742,10 @@ public int run(String[] args) {
}
/** parse command line arguments */
static Parameters parse(String[] args) {
BalancingPolicy policy = Parameters.DEFAULT.policy;
double threshold = Parameters.DEFAULT.threshold;
int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
Set<String> blockpools = Parameters.DEFAULT.blockpools;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
static BalancerParameters parse(String[] args) {
Set<String> excludedNodes = null;
Set<String> includedNodes = null;
BalancerParameters.Builder b = new BalancerParameters.Builder();
if (args != null) {
try {
@ -814,12 +754,13 @@ static Parameters parse(String[] args) {
checkArgument(++i < args.length,
"Threshold value is missing: args = " + Arrays.toString(args));
try {
threshold = Double.parseDouble(args[i]);
double threshold = Double.parseDouble(args[i]);
if (threshold < 1 || threshold > 100) {
throw new IllegalArgumentException(
"Number out of range: threshold = " + threshold);
}
LOG.info( "Using a threshold of " + threshold );
b.setThreshold(threshold);
} catch(IllegalArgumentException e) {
System.err.println(
"Expecting a number in the range of [1.0, 100.0]: "
@ -830,7 +771,7 @@ static Parameters parse(String[] args) {
checkArgument(++i < args.length,
"Policy value is missing: args = " + Arrays.toString(args));
try {
policy = BalancingPolicy.parse(args[i]);
b.setBalancingPolicy(BalancingPolicy.parse(args[i]));
} catch(IllegalArgumentException e) {
System.err.println("Illegal policy name: " + args[i]);
throw e;
@ -838,28 +779,33 @@ static Parameters parse(String[] args) {
} else if ("-exclude".equalsIgnoreCase(args[i])) {
excludedNodes = new HashSet<>();
i = processHostList(args, i, "exclude", excludedNodes);
b.setExcludedNodes(excludedNodes);
} else if ("-include".equalsIgnoreCase(args[i])) {
includedNodes = new HashSet<>();
i = processHostList(args, i, "include", includedNodes);
b.setIncludedNodes(includedNodes);
} else if ("-source".equalsIgnoreCase(args[i])) {
sourceNodes = new HashSet<>();
Set<String> sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes);
b.setSourceNodes(sourceNodes);
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
checkArgument(
++i < args.length,
"blockpools value is missing: args = "
+ Arrays.toString(args));
blockpools = parseBlockPoolList(args[i]);
Set<String> blockpools = parseBlockPoolList(args[i]);
LOG.info("Balancer will run on the following blockpools: "
+ blockpools.toString());
b.setBlockpools(blockpools);
} else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays
.toString(args));
maxIdleIteration = Integer.parseInt(args[i]);
int maxIdleIteration = Integer.parseInt(args[i]);
LOG.info("Using a idleiterations of " + maxIdleIteration);
b.setMaxIdleIteration(maxIdleIteration);
} else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
runDuringUpgrade = true;
b.setRunDuringUpgrade(true);
LOG.info("Will run the balancer even during an ongoing HDFS "
+ "upgrade. Most users will not want to run the balancer "
+ "during an upgrade since it will not affect used space "
@ -869,16 +815,14 @@ static Parameters parse(String[] args) {
+ Arrays.toString(args));
}
}
checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(),
checkArgument(excludedNodes == null || includedNodes == null,
"-exclude and -include options cannot be specified together.");
} catch(RuntimeException e) {
printUsage(System.err);
throw e;
}
}
return new Parameters(policy, threshold, maxIdleIteration, excludedNodes,
includedNodes, sourceNodes, blockpools, runDuringUpgrade);
return b.build();
}
private static int processHostList(String[] args, int i, String type,

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
final class BalancerParameters {
private final BalancingPolicy policy;
private final double threshold;
private final int maxIdleIteration;
/** Exclude the nodes in this set. */
private final Set<String> excludedNodes;
/** If empty, include any node; otherwise, include only these nodes. */
private final Set<String> includedNodes;
/**
* If empty, any node can be a source; otherwise, use only these nodes as
* source nodes.
*/
private final Set<String> sourceNodes;
/**
* A set of block pools to run the balancer on.
*/
private final Set<String> blockpools;
/**
* Whether to run the balancer during upgrade.
*/
private final boolean runDuringUpgrade;
static final BalancerParameters DEFAULT = new BalancerParameters();
private BalancerParameters() {
this(new Builder());
}
private BalancerParameters(Builder builder) {
this.policy = builder.policy;
this.threshold = builder.threshold;
this.maxIdleIteration = builder.maxIdleIteration;
this.excludedNodes = builder.excludedNodes;
this.includedNodes = builder.includedNodes;
this.sourceNodes = builder.sourceNodes;
this.blockpools = builder.blockpools;
this.runDuringUpgrade = builder.runDuringUpgrade;
}
BalancingPolicy getBalancingPolicy() {
return this.policy;
}
double getThreshold() {
return this.threshold;
}
int getMaxIdleIteration() {
return this.maxIdleIteration;
}
Set<String> getExcludedNodes() {
return this.excludedNodes;
}
Set<String> getIncludedNodes() {
return this.includedNodes;
}
Set<String> getSourceNodes() {
return this.sourceNodes;
}
Set<String> getBlockPools() {
return this.blockpools;
}
boolean getRunDuringUpgrade() {
return this.runDuringUpgrade;
}
@Override
public String toString() {
return String.format("%s.%s [%s," + " threshold = %s,"
+ " max idle iteration = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #source nodes = %s,"
+ " #blockpools = %s," + " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
threshold, maxIdleIteration, excludedNodes.size(),
includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade);
}
static class Builder {
// Defaults
private BalancingPolicy policy = BalancingPolicy.Node.INSTANCE;
private double threshold = 10.0;
private int maxIdleIteration =
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS;
private Set<String> excludedNodes = Collections.<String> emptySet();
private Set<String> includedNodes = Collections.<String> emptySet();
private Set<String> sourceNodes = Collections.<String> emptySet();
private Set<String> blockpools = Collections.<String> emptySet();
private boolean runDuringUpgrade = false;
Builder() {
}
Builder setBalancingPolicy(BalancingPolicy p) {
this.policy = p;
return this;
}
Builder setThreshold(double t) {
this.threshold = t;
return this;
}
Builder setMaxIdleIteration(int m) {
this.maxIdleIteration = m;
return this;
}
Builder setExcludedNodes(Set<String> nodes) {
this.excludedNodes = nodes;
return this;
}
Builder setIncludedNodes(Set<String> nodes) {
this.includedNodes = nodes;
return this;
}
Builder setSourceNodes(Set<String> nodes) {
this.sourceNodes = nodes;
return this;
}
Builder setBlockpools(Set<String> pools) {
this.blockpools = pools;
return this;
}
Builder setRunDuringUpgrade(boolean run) {
this.runDuringUpgrade = run;
return this;
}
BalancerParameters build() {
return new BalancerParameters(this);
}
}
}

View File

@ -75,8 +75,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@ -319,7 +319,7 @@ static void waitForHeartBeat(long expectedUsedSpace,
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p)
throws IOException, TimeoutException {
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
}
@ -377,7 +377,7 @@ public void testBalancerWithPinnedBlocks() throws Exception {
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r);
} finally {
@ -393,16 +393,16 @@ public void testBalancerWithPinnedBlocks() throws Exception {
* @throws TimeoutException
*/
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
int expectedExcludedNodes) throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.monotonicNow() + timeout;
if (!p.includedNodes.isEmpty()) {
totalCapacity = p.includedNodes.size() * CAPACITY;
if (!p.getIncludedNodes().isEmpty()) {
totalCapacity = p.getIncludedNodes().size() * CAPACITY;
}
if (!p.excludedNodes.isEmpty()) {
totalCapacity -= p.excludedNodes.size() * CAPACITY;
if (!p.getExcludedNodes().isEmpty()) {
totalCapacity -= p.getExcludedNodes().size() * CAPACITY;
}
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced;
@ -415,12 +415,12 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) {
if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0);
actualExcludedNodeCount++;
continue;
}
if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) {
if (!Dispatcher.Util.isIncluded(p.getIncludedNodes(), datanode)) {
assertTrue(nodeUtilization == 0);
actualExcludedNodeCount++;
continue;
@ -636,16 +636,14 @@ private void doTest(Configuration conf, long[] capacities,
}
}
// run balancer and validate results
Balancer.Parameters p = Balancer.Parameters.DEFAULT;
BalancerParameters.Builder pBuilder =
new BalancerParameters.Builder();
if (nodes != null) {
p = new Balancer.Parameters(
Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
Balancer.Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, false);
pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded());
pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded());
pBuilder.setRunDuringUpgrade(false);
}
BalancerParameters p = pBuilder.build();
int expectedExcludedNodes = 0;
if (nodes != null) {
@ -668,14 +666,15 @@ private void doTest(Configuration conf, long[] capacities,
}
}
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity) throws Exception {
runBalancer(conf, totalUsedSpace, totalCapacity,
BalancerParameters.DEFAULT, 0);
}
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
int excludedNodes) throws Exception {
private void runBalancer(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, int excludedNodes)
throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
// start rebalancing
@ -693,7 +692,8 @@ private void runBalancer(Configuration conf,
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
}
private static int runBalancer(Collection<URI> namenodes, final Parameters p,
private static int runBalancer(Collection<URI> namenodes,
final BalancerParameters p,
Configuration conf) throws IOException, InterruptedException {
final long sleeptime =
conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
@ -710,8 +710,8 @@ private static int runBalancer(Collection<URI> namenodes, final Parameters p,
try {
connectors = NameNodeConnector.newNameNodeConnectors(namenodes,
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
Balancer.Parameters.DEFAULT.maxIdleIteration);
BalancerParameters.DEFAULT.getMaxIdleIteration());
boolean done = false;
for(int iteration = 0; !done; iteration++) {
done = true;
@ -747,45 +747,45 @@ private static int runBalancer(Collection<URI> namenodes, final Parameters p,
return ExitStatus.SUCCESS.getExitCode();
}
private void runBalancerCli(Configuration conf,
long totalUsedSpace, long totalCapacity,
Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
private void runBalancerCli(Configuration conf, long totalUsedSpace,
long totalCapacity, BalancerParameters p, boolean useFile,
int expectedExcludedNodes) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
List <String> args = new ArrayList<String>();
args.add("-policy");
args.add("datanode");
File excludeHostsFile = null;
if (!p.excludedNodes.isEmpty()) {
if (!p.getExcludedNodes().isEmpty()) {
args.add("-exclude");
if (useFile) {
excludeHostsFile = new File ("exclude-hosts-file");
PrintWriter pw = new PrintWriter(excludeHostsFile);
for (String host: p.excludedNodes) {
for (String host : p.getExcludedNodes()) {
pw.write( host + "\n");
}
pw.close();
args.add("-f");
args.add("exclude-hosts-file");
} else {
args.add(StringUtils.join(p.excludedNodes, ','));
args.add(StringUtils.join(p.getExcludedNodes(), ','));
}
}
File includeHostsFile = null;
if (!p.includedNodes.isEmpty()) {
if (!p.getIncludedNodes().isEmpty()) {
args.add("-include");
if (useFile) {
includeHostsFile = new File ("include-hosts-file");
PrintWriter pw = new PrintWriter(includeHostsFile);
for (String host: p.includedNodes){
for (String host : p.getIncludedNodes()) {
pw.write( host + "\n");
}
pw.close();
args.add("-f");
args.add("include-hosts-file");
} else {
args.add(StringUtils.join(p.includedNodes, ','));
args.add(StringUtils.join(p.getIncludedNodes(), ','));
}
}
@ -879,14 +879,11 @@ public void testUnknownDatanode() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Set<String> datanodes = new HashSet<String>();
datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName());
Balancer.Parameters p = new Balancer.Parameters(
Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
datanodes, Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, false);
final int r = Balancer.run(namenodes, p, conf);
BalancerParameters.Builder pBuilder =
new BalancerParameters.Builder();
pBuilder.setExcludedNodes(datanodes);
pBuilder.setRunDuringUpgrade(false);
final int r = Balancer.run(namenodes, pBuilder.build(), conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally {
cluster.shutdown();
@ -1094,20 +1091,20 @@ public void testBalancerCliParseWithWrongParams() {
@Test
public void testBalancerCliParseBlockpools() {
String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" };
Balancer.Parameters p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size());
BalancerParameters p = Balancer.Cli.parse(parameters);
assertEquals(3, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1" };
p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size());
assertEquals(1, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1,,bp-2" };
p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size());
assertEquals(3, p.getBlockPools().size());
parameters = new String[] { "-blockpools", "bp-1," };
p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size());
assertEquals(1, p.getBlockPools().size());
}
@ -1125,7 +1122,8 @@ public void testBalancerWithExcludeList() throws Exception {
excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
excludeHosts, Parameters.DEFAULT.includedNodes), false, false);
excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()),
false, false);
}
/**
@ -1153,9 +1151,11 @@ public void testBalancerCliWithExcludeList() throws Exception {
Set<String> excludeHosts = new HashSet<String>();
excludeHosts.add( "datanodeY");
excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
Parameters.DEFAULT.includedNodes), true, false);
doTest(conf, new long[] { CAPACITY, CAPACITY },
new String[] { RACK0, RACK1 }, CAPACITY, RACK2, new HostNameBasedNodes(
new String[] { "datanodeX", "datanodeY", "datanodeZ" },
excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true,
false);
}
/**
@ -1185,7 +1185,8 @@ public void testBalancerCliWithExcludeListInAFile() throws Exception {
excludeHosts.add( "datanodeZ");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
excludeHosts, Parameters.DEFAULT.includedNodes), true, true);
excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true,
true);
}
/**
@ -1214,7 +1215,8 @@ public void testBalancerWithIncludeList() throws Exception {
includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), false, false);
BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts),
false, false);
}
/**
@ -1243,7 +1245,8 @@ public void testBalancerCliWithIncludeList() throws Exception {
includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), true, false);
BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true,
false);
}
/**
@ -1272,7 +1275,8 @@ public void testBalancerCliWithIncludeListInAFile() throws Exception {
includeHosts.add( "datanodeY");
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
Parameters.DEFAULT.excludedNodes, includeHosts), true, true);
BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true,
true);
}
/**
@ -1345,7 +1349,7 @@ public void testBalancerWithRamDisk() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run Balancer
final Balancer.Parameters p = Parameters.DEFAULT;
final BalancerParameters p = BalancerParameters.DEFAULT;
final int r = Balancer.run(namenodes, p, conf);
// Validate no RAM_DISK block should be moved
@ -1397,7 +1401,7 @@ public void testBalancerDuringUpgrade() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
// Run balancer
final Balancer.Parameters p = Parameters.DEFAULT;
final BalancerParameters p = BalancerParameters.DEFAULT;
fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
@ -1408,14 +1412,10 @@ public void testBalancerDuringUpgrade() throws Exception {
Balancer.run(namenodes, p, conf));
// Should work with the -runDuringUpgrade flag.
final Balancer.Parameters runDuringUpgrade =
new Balancer.Parameters(Parameters.DEFAULT.policy,
Parameters.DEFAULT.threshold,
Parameters.DEFAULT.maxIdleIteration,
Parameters.DEFAULT.excludedNodes,
Parameters.DEFAULT.includedNodes,
Parameters.DEFAULT.sourceNodes,
Balancer.Parameters.DEFAULT.blockpools, true);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setRunDuringUpgrade(true);
final BalancerParameters runDuringUpgrade = b.build();
assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf));
@ -1482,7 +1482,7 @@ public void testTwoReplicaShouldNotInSameDN() throws Exception {
// update space info
cluster.triggerHeartbeats();
Balancer.Parameters p = Balancer.Parameters.DEFAULT;
BalancerParameters p = BalancerParameters.DEFAULT;
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, p, conf);
@ -1614,12 +1614,11 @@ public void testMinBlockSizeAndSourceNodes() throws Exception {
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
{ // run Balancer with min-block-size=50
final Parameters p = new Parameters(
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
Collections.<String> emptySet(),
Balancer.Parameters.DEFAULT.blockpools, false);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
@ -1634,11 +1633,12 @@ Collections.<String> emptySet(),
for(int i = capacities.length; i < datanodes.size(); i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final Parameters p = new Parameters(
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf);
@ -1649,11 +1649,12 @@ Collections.<String> emptySet(), Collections.<String> emptySet(),
final Set<String> sourceNodes = new HashSet<>();
final List<DataNode> datanodes = cluster.getDataNodes();
sourceNodes.add(datanodes.get(0).getDisplayName());
final Parameters p = new Parameters(
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);
@ -1666,11 +1667,12 @@ Collections.<String> emptySet(), Collections.<String> emptySet(),
for(int i = 0; i < capacities.length; i++) {
sourceNodes.add(datanodes.get(i).getDisplayName());
}
final Parameters p = new Parameters(
BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE);
b.setThreshold(1);
b.setSourceNodes(sourceNodes);
final BalancerParameters p = b.build();
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf);

View File

@ -97,10 +97,10 @@ public void testBalancerWithHANameNodes() throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
assertEquals(1, namenodes.size());
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
cluster, Balancer.Parameters.DEFAULT);
cluster, BalancerParameters.DEFAULT);
} finally {
cluster.shutdown();
}

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.log4j.Level;
@ -84,10 +85,10 @@ private static class Suite {
final MiniDFSCluster cluster;
final ClientProtocol[] clients;
final short replication;
final Balancer.Parameters parameters;
final BalancerParameters parameters;
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
Balancer.Parameters parameters, Configuration conf) throws IOException {
BalancerParameters parameters, Configuration conf) throws IOException {
this.conf = conf;
this.cluster = cluster;
clients = new ClientProtocol[nNameNodes];
@ -204,7 +205,7 @@ static void runBalancer(Suite s,
balanced = true;
for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d];
balanced = p <= avg + s.parameters.threshold;
balanced = p <= avg + s.parameters.getThreshold();
if (!balanced) {
if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: "
@ -278,13 +279,14 @@ private static long getTotalPoolUsage(DatanodeStorageReport report) {
DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
Map<Integer, DatanodeStorageReport[]> reports =
new HashMap<Integer, DatanodeStorageReport[]>();
if (s.parameters.blockpools.size() == 0) {
if (s.parameters.getBlockPools().size() == 0) {
// the blockpools parameter was not set, so we don't need to track any
// blockpools.
return Collections.emptyMap();
}
for (int i = 0; i < s.clients.length; i++) {
if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i)
if (s.parameters.getBlockPools().contains(
s.cluster.getNamesystem(i)
.getBlockPoolId())) {
// we want to ensure that blockpools not specified by the balancer
// parameters were left alone. Therefore, if the pool was specified,
@ -388,14 +390,10 @@ private void unevenDistribution(final int nNameNodes,
for (int i = 0; i < nNameNodesToBalance; i++) {
blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
}
Balancer.Parameters params =
new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
Balancer.Parameters.DEFAULT.excludedNodes,
Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes, blockpools,
Balancer.Parameters.DEFAULT.runDuringUpgrade);
BalancerParameters.Builder b =
new BalancerParameters.Builder();
b.setBlockpools(blockpools);
BalancerParameters params = b.build();
final Suite s =
new Suite(cluster, nNameNodes, nDataNodes, params, conf);
for(int n = 0; n < nNameNodes; n++) {
@ -455,7 +453,7 @@ private void runTest(final int nNameNodes, long[] capacities, String[] racks,
LOG.info("RUN_TEST 1");
final Suite s =
new Suite(cluster, nNameNodes, nDataNodes,
Balancer.Parameters.DEFAULT, conf);
BalancerParameters.DEFAULT, conf);
long totalCapacity = TestBalancer.sum(capacities);
LOG.info("RUN_TEST 2");

View File

@ -175,7 +175,7 @@ private void runBalancer(Configuration conf,
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
waitForHeartBeat(totalUsedSpace, totalCapacity);
@ -189,7 +189,7 @@ private void runBalancerCanFinish(Configuration conf,
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf);
Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() ||
(r == ExitStatus.NO_MOVE_PROGRESS.getExitCode()));
waitForHeartBeat(totalUsedSpace, totalCapacity);