From 3531823fcc4966e0938cd824f117f3ecec76e47d Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Tue, 15 Sep 2015 10:16:02 -0700 Subject: [PATCH] HDFS-9008. Balancer#Parameters class could use a builder pattern. (Chris Trezzo via mingma) (cherry picked from commit 083b44c136ea5aba660fcd1dddbb2d21513b4456) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/balancer/Balancer.java | 134 ++++--------- .../server/balancer/BalancerParameters.java | 168 ++++++++++++++++ .../hdfs/server/balancer/TestBalancer.java | 180 +++++++++--------- .../balancer/TestBalancerWithHANameNodes.java | 4 +- .../TestBalancerWithMultipleNameNodes.java | 26 ++- .../balancer/TestBalancerWithNodeGroup.java | 4 +- 7 files changed, 317 insertions(+), 202 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 5c334f2d66c..891aefc9b2d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -567,6 +567,9 @@ Release 2.8.0 - UNRELEASED HDFS-9065. Include commas on # of files, blocks, total filesystem objects in NN Web UI. (Daniel Templeton via wheat9) + HDFS-9008. Balancer#Parameters class could use a builder pattern. + (Chris Trezzo via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index c4a4edc05f4..dcae9226400 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -244,7 +244,8 @@ static int getInt(Configuration conf, String key, int defaultValue) { * namenode as a client and a secondary namenode and retry proxies * when connection fails. */ - Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + Balancer(NameNodeConnector theblockpool, BalancerParameters p, + Configuration conf) { final long movedWinWidth = getLong(conf, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); @@ -266,13 +267,15 @@ static int getInt(Configuration conf, String key, int defaultValue) { DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); this.nnc = theblockpool; - this.dispatcher = new Dispatcher(theblockpool, p.includedNodes, - p.excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, - maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf); - this.threshold = p.threshold; - this.policy = p.policy; - this.sourceNodes = p.sourceNodes; - this.runDuringUpgrade = p.runDuringUpgrade; + this.dispatcher = + new Dispatcher(theblockpool, p.getIncludedNodes(), + p.getExcludedNodes(), movedWinWidth, moverThreads, + dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, + getBlocksMinBlockSize, conf); + this.threshold = p.getThreshold(); + this.policy = p.getBalancingPolicy(); + this.sourceNodes = p.getSourceNodes(); + this.runDuringUpgrade = p.getRunDuringUpgrade(); this.maxSizeToMove = getLong(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, @@ -630,7 +633,7 @@ Result runOneIteration() { * for each namenode, * execute a {@link Balancer} to work through all datanodes once. */ - static int run(Collection namenodes, final Parameters p, + static int run(Collection namenodes, final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -639,24 +642,25 @@ static int run(Collection namenodes, final Parameters p, DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; LOG.info("namenodes = " + namenodes); LOG.info("parameters = " + p); - LOG.info("included nodes = " + p.includedNodes); - LOG.info("excluded nodes = " + p.excludedNodes); - LOG.info("source nodes = " + p.sourceNodes); - + LOG.info("included nodes = " + p.getIncludedNodes()); + LOG.info("excluded nodes = " + p.getExcludedNodes()); + LOG.info("source nodes = " + p.getSourceNodes()); + System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); List connectors = Collections.emptyList(); try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, - Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, p.maxIdleIteration); - + Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf, + p.getMaxIdleIteration()); + boolean done = false; for(int iteration = 0; !done; iteration++) { done = true; Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { - if (p.blockpools.size() == 0 - || p.blockpools.contains(nnc.getBlockpoolID())) { + if (p.getBlockPools().size() == 0 + || p.getBlockPools().contains(nnc.getBlockpoolID())) { final Balancer b = new Balancer(nnc, p, conf); final Result r = b.runOneIteration(); r.print(iteration, System.out); @@ -706,65 +710,6 @@ private static String time2Str(long elapsedTime) { return time+" "+unit; } - static class Parameters { - static final Parameters DEFAULT = - new Parameters(BalancingPolicy.Node.INSTANCE, 10.0, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), - Collections. emptySet(), Collections. emptySet(), - false); - - final BalancingPolicy policy; - final double threshold; - final int maxIdleIteration; - /** Exclude the nodes in this set. */ - final Set excludedNodes; - /** If empty, include any node; otherwise, include only these nodes. */ - final Set includedNodes; - /** If empty, any node can be a source; - * otherwise, use only these nodes as source nodes. - */ - final Set sourceNodes; - /** - * A set of block pools to run the balancer on. - */ - final Set blockpools; - /** - * Whether to run the balancer during upgrade. - */ - final boolean runDuringUpgrade; - - Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, - Set excludedNodes, Set includedNodes, - Set sourceNodes, Set blockpools, - boolean runDuringUpgrade) { - this.policy = policy; - this.threshold = threshold; - this.maxIdleIteration = maxIdleIteration; - this.excludedNodes = excludedNodes; - this.includedNodes = includedNodes; - this.sourceNodes = sourceNodes; - this.blockpools = blockpools; - this.runDuringUpgrade = runDuringUpgrade; - } - - @Override - public String toString() { - return String.format("%s.%s [%s," - + " threshold = %s," - + " max idle iteration = %s," - + " #excluded nodes = %s," - + " #included nodes = %s," - + " #source nodes = %s," - + " #blockpools = %s," - + " run during upgrade = %s]", - Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, - threshold, maxIdleIteration, excludedNodes.size(), - includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade); - } - } - static class Cli extends Configured implements Tool { /** * Parse arguments and then run Balancer. @@ -797,15 +742,10 @@ public int run(String[] args) { } /** parse command line arguments */ - static Parameters parse(String[] args) { - BalancingPolicy policy = Parameters.DEFAULT.policy; - double threshold = Parameters.DEFAULT.threshold; - int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration; - Set excludedNodes = Parameters.DEFAULT.excludedNodes; - Set includedNodes = Parameters.DEFAULT.includedNodes; - Set sourceNodes = Parameters.DEFAULT.sourceNodes; - Set blockpools = Parameters.DEFAULT.blockpools; - boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; + static BalancerParameters parse(String[] args) { + Set excludedNodes = null; + Set includedNodes = null; + BalancerParameters.Builder b = new BalancerParameters.Builder(); if (args != null) { try { @@ -814,12 +754,13 @@ static Parameters parse(String[] args) { checkArgument(++i < args.length, "Threshold value is missing: args = " + Arrays.toString(args)); try { - threshold = Double.parseDouble(args[i]); + double threshold = Double.parseDouble(args[i]); if (threshold < 1 || threshold > 100) { throw new IllegalArgumentException( "Number out of range: threshold = " + threshold); } LOG.info( "Using a threshold of " + threshold ); + b.setThreshold(threshold); } catch(IllegalArgumentException e) { System.err.println( "Expecting a number in the range of [1.0, 100.0]: " @@ -830,7 +771,7 @@ static Parameters parse(String[] args) { checkArgument(++i < args.length, "Policy value is missing: args = " + Arrays.toString(args)); try { - policy = BalancingPolicy.parse(args[i]); + b.setBalancingPolicy(BalancingPolicy.parse(args[i])); } catch(IllegalArgumentException e) { System.err.println("Illegal policy name: " + args[i]); throw e; @@ -838,28 +779,33 @@ static Parameters parse(String[] args) { } else if ("-exclude".equalsIgnoreCase(args[i])) { excludedNodes = new HashSet<>(); i = processHostList(args, i, "exclude", excludedNodes); + b.setExcludedNodes(excludedNodes); } else if ("-include".equalsIgnoreCase(args[i])) { includedNodes = new HashSet<>(); i = processHostList(args, i, "include", includedNodes); + b.setIncludedNodes(includedNodes); } else if ("-source".equalsIgnoreCase(args[i])) { - sourceNodes = new HashSet<>(); + Set sourceNodes = new HashSet<>(); i = processHostList(args, i, "source", sourceNodes); + b.setSourceNodes(sourceNodes); } else if ("-blockpools".equalsIgnoreCase(args[i])) { checkArgument( ++i < args.length, "blockpools value is missing: args = " + Arrays.toString(args)); - blockpools = parseBlockPoolList(args[i]); + Set blockpools = parseBlockPoolList(args[i]); LOG.info("Balancer will run on the following blockpools: " + blockpools.toString()); + b.setBlockpools(blockpools); } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, "idleiterations value is missing: args = " + Arrays .toString(args)); - maxIdleIteration = Integer.parseInt(args[i]); + int maxIdleIteration = Integer.parseInt(args[i]); LOG.info("Using a idleiterations of " + maxIdleIteration); + b.setMaxIdleIteration(maxIdleIteration); } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) { - runDuringUpgrade = true; + b.setRunDuringUpgrade(true); LOG.info("Will run the balancer even during an ongoing HDFS " + "upgrade. Most users will not want to run the balancer " + "during an upgrade since it will not affect used space " @@ -869,16 +815,14 @@ static Parameters parse(String[] args) { + Arrays.toString(args)); } } - checkArgument(excludedNodes.isEmpty() || includedNodes.isEmpty(), + checkArgument(excludedNodes == null || includedNodes == null, "-exclude and -include options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); throw e; } } - - return new Parameters(policy, threshold, maxIdleIteration, excludedNodes, - includedNodes, sourceNodes, blockpools, runDuringUpgrade); + return b.build(); } private static int processHostList(String[] args, int i, String type, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java new file mode 100644 index 00000000000..5d5e9b18d7b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import java.util.Collections; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +final class BalancerParameters { + private final BalancingPolicy policy; + private final double threshold; + private final int maxIdleIteration; + /** Exclude the nodes in this set. */ + private final Set excludedNodes; + /** If empty, include any node; otherwise, include only these nodes. */ + private final Set includedNodes; + /** + * If empty, any node can be a source; otherwise, use only these nodes as + * source nodes. + */ + private final Set sourceNodes; + /** + * A set of block pools to run the balancer on. + */ + private final Set blockpools; + /** + * Whether to run the balancer during upgrade. + */ + private final boolean runDuringUpgrade; + + static final BalancerParameters DEFAULT = new BalancerParameters(); + + private BalancerParameters() { + this(new Builder()); + } + + private BalancerParameters(Builder builder) { + this.policy = builder.policy; + this.threshold = builder.threshold; + this.maxIdleIteration = builder.maxIdleIteration; + this.excludedNodes = builder.excludedNodes; + this.includedNodes = builder.includedNodes; + this.sourceNodes = builder.sourceNodes; + this.blockpools = builder.blockpools; + this.runDuringUpgrade = builder.runDuringUpgrade; + } + + BalancingPolicy getBalancingPolicy() { + return this.policy; + } + + double getThreshold() { + return this.threshold; + } + + int getMaxIdleIteration() { + return this.maxIdleIteration; + } + + Set getExcludedNodes() { + return this.excludedNodes; + } + + Set getIncludedNodes() { + return this.includedNodes; + } + + Set getSourceNodes() { + return this.sourceNodes; + } + + Set getBlockPools() { + return this.blockpools; + } + + boolean getRunDuringUpgrade() { + return this.runDuringUpgrade; + } + + @Override + public String toString() { + return String.format("%s.%s [%s," + " threshold = %s," + + " max idle iteration = %s," + " #excluded nodes = %s," + + " #included nodes = %s," + " #source nodes = %s," + + " #blockpools = %s," + " run during upgrade = %s]", + Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, + threshold, maxIdleIteration, excludedNodes.size(), + includedNodes.size(), sourceNodes.size(), blockpools.size(), + runDuringUpgrade); + } + + static class Builder { + // Defaults + private BalancingPolicy policy = BalancingPolicy.Node.INSTANCE; + private double threshold = 10.0; + private int maxIdleIteration = + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS; + private Set excludedNodes = Collections. emptySet(); + private Set includedNodes = Collections. emptySet(); + private Set sourceNodes = Collections. emptySet(); + private Set blockpools = Collections. emptySet(); + private boolean runDuringUpgrade = false; + + Builder() { + } + + Builder setBalancingPolicy(BalancingPolicy p) { + this.policy = p; + return this; + } + + Builder setThreshold(double t) { + this.threshold = t; + return this; + } + + Builder setMaxIdleIteration(int m) { + this.maxIdleIteration = m; + return this; + } + + Builder setExcludedNodes(Set nodes) { + this.excludedNodes = nodes; + return this; + } + + Builder setIncludedNodes(Set nodes) { + this.includedNodes = nodes; + return this; + } + + Builder setSourceNodes(Set nodes) { + this.sourceNodes = nodes; + return this; + } + + Builder setBlockpools(Set pools) { + this.blockpools = pools; + return this; + } + + Builder setRunDuringUpgrade(boolean run) { + this.runDuringUpgrade = run; + return this; + } + + BalancerParameters build() { + return new BalancerParameters(this); + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 125d056f78c..6a59231ef4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -75,8 +75,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; -import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; import org.apache.hadoop.hdfs.server.balancer.Balancer.Result; +import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase; @@ -319,7 +319,7 @@ static void waitForHeartBeat(long expectedUsedSpace, * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p) throws IOException, TimeoutException { waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); } @@ -377,7 +377,7 @@ public void testBalancerWithPinnedBlocks() throws Exception { // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), r); } finally { @@ -393,16 +393,16 @@ public void testBalancerWithPinnedBlocks() throws Exception { * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, + ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p, int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.monotonicNow() + timeout; - if (!p.includedNodes.isEmpty()) { - totalCapacity = p.includedNodes.size() * CAPACITY; + if (!p.getIncludedNodes().isEmpty()) { + totalCapacity = p.getIncludedNodes().size() * CAPACITY; } - if (!p.excludedNodes.isEmpty()) { - totalCapacity -= p.excludedNodes.size() * CAPACITY; + if (!p.getExcludedNodes().isEmpty()) { + totalCapacity -= p.getExcludedNodes().size() * CAPACITY; } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; @@ -415,12 +415,12 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); - if (Dispatcher.Util.isExcluded(p.excludedNodes, datanode)) { + if (Dispatcher.Util.isExcluded(p.getExcludedNodes(), datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; } - if (!Dispatcher.Util.isIncluded(p.includedNodes, datanode)) { + if (!Dispatcher.Util.isIncluded(p.getIncludedNodes(), datanode)) { assertTrue(nodeUtilization == 0); actualExcludedNodeCount++; continue; @@ -636,16 +636,14 @@ private void doTest(Configuration conf, long[] capacities, } } // run balancer and validate results - Balancer.Parameters p = Balancer.Parameters.DEFAULT; + BalancerParameters.Builder pBuilder = + new BalancerParameters.Builder(); if (nodes != null) { - p = new Balancer.Parameters( - Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), - Balancer.Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, false); + pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded()); + pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded()); + pBuilder.setRunDuringUpgrade(false); } + BalancerParameters p = pBuilder.build(); int expectedExcludedNodes = 0; if (nodes != null) { @@ -668,14 +666,15 @@ private void doTest(Configuration conf, long[] capacities, } } - private void runBalancer(Configuration conf, - long totalUsedSpace, long totalCapacity) throws Exception { - runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); + private void runBalancer(Configuration conf, long totalUsedSpace, + long totalCapacity) throws Exception { + runBalancer(conf, totalUsedSpace, totalCapacity, + BalancerParameters.DEFAULT, 0); } - private void runBalancer(Configuration conf, - long totalUsedSpace, long totalCapacity, Balancer.Parameters p, - int excludedNodes) throws Exception { + private void runBalancer(Configuration conf, long totalUsedSpace, + long totalCapacity, BalancerParameters p, int excludedNodes) + throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing @@ -693,7 +692,8 @@ private void runBalancer(Configuration conf, waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); } - private static int runBalancer(Collection namenodes, final Parameters p, + private static int runBalancer(Collection namenodes, + final BalancerParameters p, Configuration conf) throws IOException, InterruptedException { final long sleeptime = conf.getLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, @@ -710,8 +710,8 @@ private static int runBalancer(Collection namenodes, final Parameters p, try { connectors = NameNodeConnector.newNameNodeConnectors(namenodes, Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf, - Balancer.Parameters.DEFAULT.maxIdleIteration); - + BalancerParameters.DEFAULT.getMaxIdleIteration()); + boolean done = false; for(int iteration = 0; !done; iteration++) { done = true; @@ -747,45 +747,45 @@ private static int runBalancer(Collection namenodes, final Parameters p, return ExitStatus.SUCCESS.getExitCode(); } - private void runBalancerCli(Configuration conf, - long totalUsedSpace, long totalCapacity, - Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { + private void runBalancerCli(Configuration conf, long totalUsedSpace, + long totalCapacity, BalancerParameters p, boolean useFile, + int expectedExcludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); List args = new ArrayList(); args.add("-policy"); args.add("datanode"); File excludeHostsFile = null; - if (!p.excludedNodes.isEmpty()) { + if (!p.getExcludedNodes().isEmpty()) { args.add("-exclude"); if (useFile) { excludeHostsFile = new File ("exclude-hosts-file"); PrintWriter pw = new PrintWriter(excludeHostsFile); - for (String host: p.excludedNodes) { + for (String host : p.getExcludedNodes()) { pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("exclude-hosts-file"); } else { - args.add(StringUtils.join(p.excludedNodes, ',')); + args.add(StringUtils.join(p.getExcludedNodes(), ',')); } } File includeHostsFile = null; - if (!p.includedNodes.isEmpty()) { + if (!p.getIncludedNodes().isEmpty()) { args.add("-include"); if (useFile) { includeHostsFile = new File ("include-hosts-file"); PrintWriter pw = new PrintWriter(includeHostsFile); - for (String host: p.includedNodes){ + for (String host : p.getIncludedNodes()) { pw.write( host + "\n"); } pw.close(); args.add("-f"); args.add("include-hosts-file"); } else { - args.add(StringUtils.join(p.includedNodes, ',')); + args.add(StringUtils.join(p.getIncludedNodes(), ',')); } } @@ -879,14 +879,11 @@ public void testUnknownDatanode() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); Set datanodes = new HashSet(); datanodes.add(cluster.getDataNodes().get(0).getDatanodeId().getHostName()); - Balancer.Parameters p = new Balancer.Parameters( - Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - datanodes, Balancer.Parameters.DEFAULT.includedNodes, - Balancer.Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, false); - final int r = Balancer.run(namenodes, p, conf); + BalancerParameters.Builder pBuilder = + new BalancerParameters.Builder(); + pBuilder.setExcludedNodes(datanodes); + pBuilder.setRunDuringUpgrade(false); + final int r = Balancer.run(namenodes, pBuilder.build(), conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { cluster.shutdown(); @@ -1094,20 +1091,20 @@ public void testBalancerCliParseWithWrongParams() { @Test public void testBalancerCliParseBlockpools() { String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" }; - Balancer.Parameters p = Balancer.Cli.parse(parameters); - assertEquals(3, p.blockpools.size()); + BalancerParameters p = Balancer.Cli.parse(parameters); + assertEquals(3, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1" }; p = Balancer.Cli.parse(parameters); - assertEquals(1, p.blockpools.size()); + assertEquals(1, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1,,bp-2" }; p = Balancer.Cli.parse(parameters); - assertEquals(3, p.blockpools.size()); + assertEquals(3, p.getBlockPools().size()); parameters = new String[] { "-blockpools", "bp-1," }; p = Balancer.Cli.parse(parameters); - assertEquals(1, p.blockpools.size()); + assertEquals(1, p.getBlockPools().size()); } @@ -1125,7 +1122,8 @@ public void testBalancerWithExcludeList() throws Exception { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.includedNodes), false, false); + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), + false, false); } /** @@ -1153,9 +1151,11 @@ public void testBalancerCliWithExcludeList() throws Exception { Set excludeHosts = new HashSet(); excludeHosts.add( "datanodeY"); excludeHosts.add( "datanodeZ"); - doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, - new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, - Parameters.DEFAULT.includedNodes), true, false); + doTest(conf, new long[] { CAPACITY, CAPACITY }, + new String[] { RACK0, RACK1 }, CAPACITY, RACK2, new HostNameBasedNodes( + new String[] { "datanodeX", "datanodeY", "datanodeZ" }, + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, + false); } /** @@ -1185,7 +1185,8 @@ public void testBalancerCliWithExcludeListInAFile() throws Exception { excludeHosts.add( "datanodeZ"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - excludeHosts, Parameters.DEFAULT.includedNodes), true, true); + excludeHosts, BalancerParameters.DEFAULT.getIncludedNodes()), true, + true); } /** @@ -1214,7 +1215,8 @@ public void testBalancerWithIncludeList() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), false, false); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), + false, false); } /** @@ -1243,7 +1245,8 @@ public void testBalancerCliWithIncludeList() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), true, false); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, + false); } /** @@ -1272,7 +1275,8 @@ public void testBalancerCliWithIncludeListInAFile() throws Exception { includeHosts.add( "datanodeY"); doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, - Parameters.DEFAULT.excludedNodes, includeHosts), true, true); + BalancerParameters.DEFAULT.getExcludedNodes(), includeHosts), true, + true); } /** @@ -1345,7 +1349,7 @@ public void testBalancerWithRamDisk() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); // Run Balancer - final Balancer.Parameters p = Parameters.DEFAULT; + final BalancerParameters p = BalancerParameters.DEFAULT; final int r = Balancer.run(namenodes, p, conf); // Validate no RAM_DISK block should be moved @@ -1397,7 +1401,7 @@ public void testBalancerDuringUpgrade() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); // Run balancer - final Balancer.Parameters p = Parameters.DEFAULT; + final BalancerParameters p = BalancerParameters.DEFAULT; fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER); fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE); @@ -1408,14 +1412,10 @@ public void testBalancerDuringUpgrade() throws Exception { Balancer.run(namenodes, p, conf)); // Should work with the -runDuringUpgrade flag. - final Balancer.Parameters runDuringUpgrade = - new Balancer.Parameters(Parameters.DEFAULT.policy, - Parameters.DEFAULT.threshold, - Parameters.DEFAULT.maxIdleIteration, - Parameters.DEFAULT.excludedNodes, - Parameters.DEFAULT.includedNodes, - Parameters.DEFAULT.sourceNodes, - Balancer.Parameters.DEFAULT.blockpools, true); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setRunDuringUpgrade(true); + final BalancerParameters runDuringUpgrade = b.build(); assertEquals(ExitStatus.SUCCESS.getExitCode(), Balancer.run(namenodes, runDuringUpgrade, conf)); @@ -1482,7 +1482,7 @@ public void testTwoReplicaShouldNotInSameDN() throws Exception { // update space info cluster.triggerHeartbeats(); - Balancer.Parameters p = Balancer.Parameters.DEFAULT; + BalancerParameters p = BalancerParameters.DEFAULT; Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); final int r = Balancer.run(namenodes, p, conf); @@ -1614,12 +1614,11 @@ public void testMinBlockSizeAndSourceNodes() throws Exception { final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); { // run Balancer with min-block-size=50 - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), - Collections. emptySet(), - Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1634,11 +1633,12 @@ Collections. emptySet(), for(int i = capacities.length; i < datanodes.size(); i++) { sourceNodes.add(datanodes.get(i).getDisplayName()); } - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1649,11 +1649,12 @@ Collections. emptySet(), Collections. emptySet(), final Set sourceNodes = new HashSet<>(); final List datanodes = cluster.getDataNodes(); sourceNodes.add(datanodes.get(0).getDisplayName()); - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); @@ -1666,11 +1667,12 @@ Collections. emptySet(), Collections. emptySet(), for(int i = 0; i < capacities.length; i++) { sourceNodes.add(datanodes.get(i).getDisplayName()); } - final Parameters p = new Parameters( - BalancingPolicy.Node.INSTANCE, 1, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections. emptySet(), Collections. emptySet(), - sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBalancingPolicy(BalancingPolicy.Node.INSTANCE); + b.setThreshold(1); + b.setSourceNodes(sourceNodes); + final BalancerParameters p = b.build(); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 7559de4338f..1693cf182c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -97,10 +97,10 @@ public void testBalancerWithHANameNodes() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, - cluster, Balancer.Parameters.DEFAULT); + cluster, BalancerParameters.DEFAULT); } finally { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index b07ad89aa38..c5d16ab6109 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.balancer.BalancerParameters; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.log4j.Level; @@ -84,10 +85,10 @@ private static class Suite { final MiniDFSCluster cluster; final ClientProtocol[] clients; final short replication; - final Balancer.Parameters parameters; + final BalancerParameters parameters; Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, - Balancer.Parameters parameters, Configuration conf) throws IOException { + BalancerParameters parameters, Configuration conf) throws IOException { this.conf = conf; this.cluster = cluster; clients = new ClientProtocol[nNameNodes]; @@ -204,7 +205,7 @@ static void runBalancer(Suite s, balanced = true; for(int d = 0; d < used.length; d++) { final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + s.parameters.threshold; + balanced = p <= avg + s.parameters.getThreshold(); if (!balanced) { if (i % 100 == 0) { LOG.warn("datanodes " + d + " is not yet balanced: " @@ -278,13 +279,14 @@ private static long getTotalPoolUsage(DatanodeStorageReport report) { DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException { Map reports = new HashMap(); - if (s.parameters.blockpools.size() == 0) { + if (s.parameters.getBlockPools().size() == 0) { // the blockpools parameter was not set, so we don't need to track any // blockpools. return Collections.emptyMap(); } for (int i = 0; i < s.clients.length; i++) { - if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i) + if (s.parameters.getBlockPools().contains( + s.cluster.getNamesystem(i) .getBlockPoolId())) { // we want to ensure that blockpools not specified by the balancer // parameters were left alone. Therefore, if the pool was specified, @@ -388,14 +390,10 @@ private void unevenDistribution(final int nNameNodes, for (int i = 0; i < nNameNodesToBalance; i++) { blockpools.add(cluster.getNamesystem(i).getBlockPoolId()); } - Balancer.Parameters params = - new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy, - Balancer.Parameters.DEFAULT.threshold, - Balancer.Parameters.DEFAULT.maxIdleIteration, - Balancer.Parameters.DEFAULT.excludedNodes, - Balancer.Parameters.DEFAULT.includedNodes, - Balancer.Parameters.DEFAULT.sourceNodes, blockpools, - Balancer.Parameters.DEFAULT.runDuringUpgrade); + BalancerParameters.Builder b = + new BalancerParameters.Builder(); + b.setBlockpools(blockpools); + BalancerParameters params = b.build(); final Suite s = new Suite(cluster, nNameNodes, nDataNodes, params, conf); for(int n = 0; n < nNameNodes; n++) { @@ -455,7 +453,7 @@ private void runTest(final int nNameNodes, long[] capacities, String[] racks, LOG.info("RUN_TEST 1"); final Suite s = new Suite(cluster, nNameNodes, nDataNodes, - Balancer.Parameters.DEFAULT, conf); + BalancerParameters.DEFAULT, conf); long totalCapacity = TestBalancer.sum(capacities); LOG.info("RUN_TEST 2"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 7af3a0e7d7d..bfa2835ac7f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -175,7 +175,7 @@ private void runBalancer(Configuration conf, // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); waitForHeartBeat(totalUsedSpace, totalCapacity); @@ -189,7 +189,7 @@ private void runBalancerCanFinish(Configuration conf, // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); + final int r = Balancer.run(namenodes, BalancerParameters.DEFAULT, conf); Assert.assertTrue(r == ExitStatus.SUCCESS.getExitCode() || (r == ExitStatus.NO_MOVE_PROGRESS.getExitCode())); waitForHeartBeat(totalUsedSpace, totalCapacity);