From f81d12668f6c5d8b2d1689ccac2c6ecf91a4eee6 Mon Sep 17 00:00:00 2001 From: Ming Ma Date: Wed, 2 Sep 2015 15:55:42 -0700 Subject: [PATCH] HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo via mingma) (cherry picked from commit d31a41c35927f02f2fb40d19380b5df4bb2b6d57) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/server/balancer/Balancer.java | 80 +++++--- .../src/site/markdown/HDFSCommands.md | 2 + .../hdfs/server/balancer/TestBalancer.java | 43 ++++- .../TestBalancerWithMultipleNameNodes.java | 179 +++++++++++++++--- 5 files changed, 252 insertions(+), 55 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index def33d3c631..9553ec36d79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -535,6 +535,9 @@ Release 2.8.0 - UNRELEASED HDFS-328. Improve fs -setrep error message for invalid replication factors. (Daniel Templeton via wang) + HDFS-8890. Allow admin to specify which blockpools the balancer should run + on. (Chris Trezzo via mingma) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 9d3ddd4ede4..c4a4edc05f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -180,6 +180,8 @@ public class Balancer { + "\tExcludes the specified datanodes." + "\n\t[-include [-f | ]]" + "\tIncludes only the specified datanodes." + + "\n\t[-blockpools ]" + + "\tThe balancer will only run on blockpools included in this list." + "\n\t[-idleiterations ]" + "\tNumber of consecutive idle iterations (-1 for Infinite) before " + "exit." @@ -653,23 +655,28 @@ static int run(Collection namenodes, final Parameters p, done = true; Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { - final Balancer b = new Balancer(nnc, p, conf); - final Result r = b.runOneIteration(); - r.print(iteration, System.out); + if (p.blockpools.size() == 0 + || p.blockpools.contains(nnc.getBlockpoolID())) { + final Balancer b = new Balancer(nnc, p, conf); + final Result r = b.runOneIteration(); + r.print(iteration, System.out); - // clean all lists - b.resetData(conf); - if (r.exitStatus == ExitStatus.IN_PROGRESS) { - done = false; - } else if (r.exitStatus != ExitStatus.SUCCESS) { - //must be an error statue, return. - return r.exitStatus.getExitCode(); + // clean all lists + b.resetData(conf); + if (r.exitStatus == ExitStatus.IN_PROGRESS) { + done = false; + } else if (r.exitStatus != ExitStatus.SUCCESS) { + // must be an error statue, return. + return r.exitStatus.getExitCode(); + } + + if (!done) { + Thread.sleep(sleeptime); + } + } else { + LOG.info("Skipping blockpool " + nnc.getBlockpoolID()); } } - - if (!done) { - Thread.sleep(sleeptime); - } } } finally { for(NameNodeConnector nnc : connectors) { @@ -700,12 +707,12 @@ private static String time2Str(long elapsedTime) { } static class Parameters { - static final Parameters DEFAULT = new Parameters( - BalancingPolicy.Node.INSTANCE, 10.0, - NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, - Collections.emptySet(), Collections.emptySet(), - Collections.emptySet(), - false); + static final Parameters DEFAULT = + new Parameters(BalancingPolicy.Node.INSTANCE, 10.0, + NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, + Collections. emptySet(), Collections. emptySet(), + Collections. emptySet(), Collections. emptySet(), + false); final BalancingPolicy policy; final double threshold; @@ -718,6 +725,10 @@ static class Parameters { * otherwise, use only these nodes as source nodes. */ final Set sourceNodes; + /** + * A set of block pools to run the balancer on. + */ + final Set blockpools; /** * Whether to run the balancer during upgrade. */ @@ -725,13 +736,15 @@ static class Parameters { Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, Set excludedNodes, Set includedNodes, - Set sourceNodes, boolean runDuringUpgrade) { + Set sourceNodes, Set blockpools, + boolean runDuringUpgrade) { this.policy = policy; this.threshold = threshold; this.maxIdleIteration = maxIdleIteration; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; this.sourceNodes = sourceNodes; + this.blockpools = blockpools; this.runDuringUpgrade = runDuringUpgrade; } @@ -743,10 +756,11 @@ public String toString() { + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," + + " #blockpools = %s," + " run during upgrade = %s]", - Balancer.class.getSimpleName(), getClass().getSimpleName(), - policy, threshold, maxIdleIteration, - excludedNodes.size(), includedNodes.size(), sourceNodes.size(), + Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, + threshold, maxIdleIteration, excludedNodes.size(), + includedNodes.size(), sourceNodes.size(), blockpools.size(), runDuringUpgrade); } } @@ -790,6 +804,7 @@ static Parameters parse(String[] args) { Set excludedNodes = Parameters.DEFAULT.excludedNodes; Set includedNodes = Parameters.DEFAULT.includedNodes; Set sourceNodes = Parameters.DEFAULT.sourceNodes; + Set blockpools = Parameters.DEFAULT.blockpools; boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; if (args != null) { @@ -829,6 +844,14 @@ static Parameters parse(String[] args) { } else if ("-source".equalsIgnoreCase(args[i])) { sourceNodes = new HashSet<>(); i = processHostList(args, i, "source", sourceNodes); + } else if ("-blockpools".equalsIgnoreCase(args[i])) { + checkArgument( + ++i < args.length, + "blockpools value is missing: args = " + + Arrays.toString(args)); + blockpools = parseBlockPoolList(args[i]); + LOG.info("Balancer will run on the following blockpools: " + + blockpools.toString()); } else if ("-idleiterations".equalsIgnoreCase(args[i])) { checkArgument(++i < args.length, "idleiterations value is missing: args = " + Arrays @@ -854,8 +877,8 @@ static Parameters parse(String[] args) { } } - return new Parameters(policy, threshold, maxIdleIteration, - excludedNodes, includedNodes, sourceNodes, runDuringUpgrade); + return new Parameters(policy, threshold, maxIdleIteration, excludedNodes, + includedNodes, sourceNodes, blockpools, runDuringUpgrade); } private static int processHostList(String[] args, int i, String type, @@ -882,6 +905,11 @@ private static int processHostList(String[] args, int i, String type, return i; } + private static Set parseBlockPoolList(String string) { + String[] addrs = StringUtils.getTrimmedStrings(string); + return new HashSet(Arrays.asList(addrs)); + } + private static void printUsage(PrintStream out) { out.println(USAGE + "\n"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 5fa7220f72f..b4b870b93a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -265,6 +265,7 @@ Usage: [-policy ] [-exclude [-f | ]] [-include [-f | ]] + [-blockpools ] [-idleiterations ] | COMMAND\_OPTION | Description | @@ -273,6 +274,7 @@ Usage: | `-threshold` \ | Percentage of disk capacity. This overwrites the default threshold. | | `-exclude -f` \ \| \ | Excludes the specified datanodes from being balanced by the balancer. | | `-include -f` \ \| \ | Includes only the specified datanodes to be balanced by the balancer. | +| `-blockpools` \ | The balancer will only run on blockpools included in this list. | | `-idleiterations` \ | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). | Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 5a8c9f84f9a..125d056f78c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -644,7 +644,7 @@ private void doTest(Configuration conf, long[] capacities, Balancer.Parameters.DEFAULT.maxIdleIteration, nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), Balancer.Parameters.DEFAULT.sourceNodes, - false); + Balancer.Parameters.DEFAULT.blockpools, false); } int expectedExcludedNodes = 0; @@ -885,7 +885,7 @@ public void testUnknownDatanode() throws Exception { Balancer.Parameters.DEFAULT.maxIdleIteration, datanodes, Balancer.Parameters.DEFAULT.includedNodes, Balancer.Parameters.DEFAULT.sourceNodes, - false); + Balancer.Parameters.DEFAULT.blockpools, false); final int r = Balancer.run(namenodes, p, conf); assertEquals(ExitStatus.SUCCESS.getExitCode(), r); } finally { @@ -1080,6 +1080,34 @@ public void testBalancerCliParseWithWrongParams() { } catch (IllegalArgumentException e) { } + + parameters = new String[] { "-blockpools" }; + try { + Balancer.Cli.parse(parameters); + fail("IllegalArgumentException is expected when a value " + + "is not specified for the blockpool flag"); + } catch (IllegalArgumentException e) { + + } + } + + @Test + public void testBalancerCliParseBlockpools() { + String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" }; + Balancer.Parameters p = Balancer.Cli.parse(parameters); + assertEquals(3, p.blockpools.size()); + + parameters = new String[] { "-blockpools", "bp-1" }; + p = Balancer.Cli.parse(parameters); + assertEquals(1, p.blockpools.size()); + + parameters = new String[] { "-blockpools", "bp-1,,bp-2" }; + p = Balancer.Cli.parse(parameters); + assertEquals(3, p.blockpools.size()); + + parameters = new String[] { "-blockpools", "bp-1," }; + p = Balancer.Cli.parse(parameters); + assertEquals(1, p.blockpools.size()); } @@ -1387,7 +1415,7 @@ public void testBalancerDuringUpgrade() throws Exception { Parameters.DEFAULT.excludedNodes, Parameters.DEFAULT.includedNodes, Parameters.DEFAULT.sourceNodes, - true); + Balancer.Parameters.DEFAULT.blockpools, true); assertEquals(ExitStatus.SUCCESS.getExitCode(), Balancer.run(namenodes, runDuringUpgrade, conf)); @@ -1590,7 +1618,8 @@ public void testMinBlockSizeAndSourceNodes() throws Exception { BalancingPolicy.Node.INSTANCE, 1, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, Collections. emptySet(), Collections. emptySet(), - Collections. emptySet(), false); + Collections. emptySet(), + Balancer.Parameters.DEFAULT.blockpools, false); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1609,7 +1638,7 @@ Collections. emptySet(), Collections. emptySet(), BalancingPolicy.Node.INSTANCE, 1, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, Collections. emptySet(), Collections. emptySet(), - sourceNodes, false); + sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); final int r = Balancer.run(namenodes, p, conf); @@ -1624,7 +1653,7 @@ Collections. emptySet(), Collections. emptySet(), BalancingPolicy.Node.INSTANCE, 1, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, Collections. emptySet(), Collections. emptySet(), - sourceNodes, false); + sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); @@ -1641,7 +1670,7 @@ Collections. emptySet(), Collections. emptySet(), BalancingPolicy.Node.INSTANCE, 1, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, Collections. emptySet(), Collections. emptySet(), - sourceNodes, false); + sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); final int r = Balancer.run(namenodes, p, conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index f51757c9e83..b07ad89aa38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -21,8 +21,13 @@ import java.net.URI; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; @@ -42,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; @@ -60,6 +67,7 @@ public class TestBalancerWithMultipleNameNodes { private static final long CAPACITY = 500L; private static final String RACK0 = "/rack0"; private static final String RACK1 = "/rack1"; + private static final String RACK2 = "/rack2"; private static final String FILE_NAME = "/tmp.txt"; private static final Path FILE_PATH = new Path(FILE_NAME); @@ -76,16 +84,20 @@ private static class Suite { final MiniDFSCluster cluster; final ClientProtocol[] clients; final short replication; - + final Balancer.Parameters parameters; + Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, - Configuration conf) throws IOException { + Balancer.Parameters parameters, Configuration conf) throws IOException { this.conf = conf; this.cluster = cluster; clients = new ClientProtocol[nNameNodes]; for(int i = 0; i < nNameNodes; i++) { clients[i] = cluster.getNameNode(i).getRpcServer(); } - replication = (short)Math.max(1, nDataNodes - 1); + // hard coding replication factor to 1 so logical and raw HDFS size are + // equal + replication = 1; + this.parameters = parameters; } } @@ -104,11 +116,9 @@ private static ExtendedBlock[][] generateBlocks(Suite s, long size ) throws IOException, InterruptedException, TimeoutException { final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][]; for(int n = 0; n < s.clients.length; n++) { - final long fileLen = size/s.replication; - createFile(s, n, fileLen); - - final List locatedBlocks = s.clients[n].getBlockLocations( - FILE_NAME, 0, fileLen).getLocatedBlocks(); + createFile(s, n, size); + final List locatedBlocks = + s.clients[n].getBlockLocations(FILE_NAME, 0, size).getLocatedBlocks(); final int numOfBlocks = locatedBlocks.size(); blocks[n] = new ExtendedBlock[numOfBlocks]; @@ -151,9 +161,14 @@ static void runBalancer(Suite s, wait(s.clients, totalUsed, totalCapacity); LOG.info("BALANCER 1"); + // get storage reports for relevant blockpools so that we can compare + // blockpool usages after balancer has run + Map preBalancerPoolUsages = + getStorageReports(s); + // start rebalancing final Collection namenodes = DFSUtil.getNsServiceRpcUris(s.conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); + final int r = Balancer.run(namenodes, s.parameters, s.conf); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); LOG.info("BALANCER 2"); @@ -189,7 +204,7 @@ static void runBalancer(Suite s, balanced = true; for(int d = 0; d < used.length; d++) { final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold; + balanced = p <= avg + s.parameters.threshold; if (!balanced) { if (i % 100 == 0) { LOG.warn("datanodes " + d + " is not yet balanced: " @@ -203,6 +218,89 @@ static void runBalancer(Suite s, } } LOG.info("BALANCER 6"); + // cluster is balanced, verify that only selected blockpools were touched + Map postBalancerPoolUsages = + getStorageReports(s); + Assert.assertEquals(preBalancerPoolUsages.size(), + postBalancerPoolUsages.size()); + for (Map.Entry entry + : preBalancerPoolUsages.entrySet()) { + compareTotalPoolUsage(entry.getValue(), + postBalancerPoolUsages.get(entry.getKey())); + } + } + + /** + * Compare the total blockpool usage on each datanode to ensure that nothing + * was balanced. + * + * @param preReports storage reports from pre balancer run + * @param postReports storage reports from post balancer run + */ + private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports, + DatanodeStorageReport[] postReports) { + Assert.assertNotNull(preReports); + Assert.assertNotNull(postReports); + Assert.assertEquals(preReports.length, postReports.length); + for (DatanodeStorageReport preReport : preReports) { + String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid(); + for(DatanodeStorageReport postReport : postReports) { + if(postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) { + Assert.assertEquals(getTotalPoolUsage(preReport), + getTotalPoolUsage(postReport)); + LOG.info("Comparision of datanode pool usage pre/post balancer run. " + + "PrePoolUsage: " + getTotalPoolUsage(preReport) + + ", PostPoolUsage: " + getTotalPoolUsage(postReport)); + break; + } + } + } + } + + private static long getTotalPoolUsage(DatanodeStorageReport report) { + long usage = 0L; + for (StorageReport sr : report.getStorageReports()) { + usage += sr.getBlockPoolUsed(); + } + return usage; + } + + /** + * Get the storage reports for all blockpools that were not specified by the + * balancer blockpool parameters. If none were specified then the parameter + * was not set and do not return any reports. + * + * @param s suite for the test + * @return a map of storage reports where the key is the blockpool index + * @throws IOException + */ + private static Map getStorageReports(Suite s) throws IOException { + Map reports = + new HashMap(); + if (s.parameters.blockpools.size() == 0) { + // the blockpools parameter was not set, so we don't need to track any + // blockpools. + return Collections.emptyMap(); + } + for (int i = 0; i < s.clients.length; i++) { + if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i) + .getBlockPoolId())) { + // we want to ensure that blockpools not specified by the balancer + // parameters were left alone. Therefore, if the pool was specified, + // skip it. Note: this code assumes the clients in the suite are ordered + // the same way that they are indexed via cluster#getNamesystem(index). + continue; + } else { + LOG.info("Tracking usage of blockpool id: " + + s.cluster.getNamesystem(i).getBlockPoolId()); + reports.put(i, + s.clients[i].getDatanodeStorageReport(DatanodeReportType.LIVE)); + } + } + LOG.info("Tracking " + reports.size() + + " blockpool(s) for pre/post balancer usage."); + return reports; } private static void sleep(long ms) { @@ -220,25 +318,31 @@ private static Configuration createConf() { } /** - * First start a cluster and fill the cluster up to a certain size. - * Then redistribute blocks according the required distribution. - * Finally, balance the cluster. - * + * First start a cluster and fill the cluster up to a certain size. Then + * redistribute blocks according the required distribution. Finally, balance + * the cluster. + * * @param nNameNodes Number of NameNodes - * @param distributionPerNN The distribution for each NameNode. + * @param nNameNodesToBalance Number of NameNodes to run the balancer on + * @param distributionPerNN The distribution for each NameNode. * @param capacities Capacities of the datanodes * @param racks Rack names * @param conf Configuration */ private void unevenDistribution(final int nNameNodes, - long distributionPerNN[], long capacities[], String[] racks, - Configuration conf) throws Exception { + final int nNameNodesToBalance, long distributionPerNN[], + long capacities[], String[] racks, Configuration conf) throws Exception { LOG.info("UNEVEN 0"); final int nDataNodes = distributionPerNN.length; if (capacities.length != nDataNodes || racks.length != nDataNodes) { throw new IllegalArgumentException("Array length is not the same"); } + if (nNameNodesToBalance > nNameNodes) { + throw new IllegalArgumentException("Number of namenodes to balance is " + + "greater than the number of namenodes."); + } + // calculate total space that need to be filled final long usedSpacePerNN = TestBalancer.sum(distributionPerNN); @@ -248,7 +352,7 @@ private void unevenDistribution(final int nNameNodes, LOG.info("UNEVEN 1"); final MiniDFSCluster cluster = new MiniDFSCluster .Builder(new Configuration(conf)) - .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes)) .numDataNodes(nDataNodes) .racks(racks) .simulatedCapacities(capacities) @@ -258,7 +362,7 @@ private void unevenDistribution(final int nNameNodes, cluster.waitActive(); DFSTestUtil.setFederatedConfiguration(cluster, conf); LOG.info("UNEVEN 3"); - final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); + final Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf); blocks = generateBlocks(s, usedSpacePerNN); LOG.info("UNEVEN 4"); } finally { @@ -280,7 +384,20 @@ private void unevenDistribution(final int nNameNodes, try { cluster.waitActive(); LOG.info("UNEVEN 12"); - final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); + Set blockpools = new HashSet(); + for (int i = 0; i < nNameNodesToBalance; i++) { + blockpools.add(cluster.getNamesystem(i).getBlockPoolId()); + } + Balancer.Parameters params = + new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy, + Balancer.Parameters.DEFAULT.threshold, + Balancer.Parameters.DEFAULT.maxIdleIteration, + Balancer.Parameters.DEFAULT.excludedNodes, + Balancer.Parameters.DEFAULT.includedNodes, + Balancer.Parameters.DEFAULT.sourceNodes, blockpools, + Balancer.Parameters.DEFAULT.runDuringUpgrade); + final Suite s = + new Suite(cluster, nNameNodes, nDataNodes, params, conf); for(int n = 0; n < nNameNodes; n++) { // redistribute blocks final Block[][] blocksDN = TestBalancer.distributeBlocks( @@ -336,7 +453,9 @@ private void runTest(final int nNameNodes, long[] capacities, String[] racks, try { cluster.waitActive(); LOG.info("RUN_TEST 1"); - final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); + final Suite s = + new Suite(cluster, nNameNodes, nDataNodes, + Balancer.Parameters.DEFAULT, conf); long totalCapacity = TestBalancer.sum(capacities); LOG.info("RUN_TEST 2"); @@ -378,10 +497,26 @@ public void testBalancer() throws Exception { @Test public void testUnevenDistribution() throws Exception { final Configuration conf = createConf(); - unevenDistribution(2, + unevenDistribution(2, 2, new long[] {30*CAPACITY/100, 5*CAPACITY/100}, new long[]{CAPACITY, CAPACITY}, new String[] {RACK0, RACK1}, conf); } + + @Test + public void testBalancing1OutOf2Blockpools() throws Exception { + final Configuration conf = createConf(); + unevenDistribution(2, 1, new long[] { 30 * CAPACITY / 100, + 5 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY }, new String[] { + RACK0, RACK1 }, conf); + } + + @Test + public void testBalancing2OutOf3Blockpools() throws Exception { + final Configuration conf = createConf(); + unevenDistribution(3, 2, new long[] { 30 * CAPACITY / 100, + 5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY, + CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf); + } }