HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo via mingma)

(cherry picked from commit d31a41c359)
This commit is contained in:
Ming Ma 2015-09-02 15:55:42 -07:00
parent 1d56325a80
commit f81d12668f
5 changed files with 252 additions and 55 deletions

View File

@ -535,6 +535,9 @@ Release 2.8.0 - UNRELEASED
HDFS-328. Improve fs -setrep error message for invalid replication factors. HDFS-328. Improve fs -setrep error message for invalid replication factors.
(Daniel Templeton via wang) (Daniel Templeton via wang)
HDFS-8890. Allow admin to specify which blockpools the balancer should run
on. (Chris Trezzo via mingma)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -180,6 +180,8 @@ public class Balancer {
+ "\tExcludes the specified datanodes." + "\tExcludes the specified datanodes."
+ "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]" + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
+ "\tIncludes only the specified datanodes." + "\tIncludes only the specified datanodes."
+ "\n\t[-blockpools <comma-separated list of blockpool ids>]"
+ "\tThe balancer will only run on blockpools included in this list."
+ "\n\t[-idleiterations <idleiterations>]" + "\n\t[-idleiterations <idleiterations>]"
+ "\tNumber of consecutive idle iterations (-1 for Infinite) before " + "\tNumber of consecutive idle iterations (-1 for Infinite) before "
+ "exit." + "exit."
@ -653,23 +655,28 @@ public class Balancer {
done = true; done = true;
Collections.shuffle(connectors); Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf); if (p.blockpools.size() == 0
final Result r = b.runOneIteration(); || p.blockpools.contains(nnc.getBlockpoolID())) {
r.print(iteration, System.out); final Balancer b = new Balancer(nnc, p, conf);
final Result r = b.runOneIteration();
r.print(iteration, System.out);
// clean all lists // clean all lists
b.resetData(conf); b.resetData(conf);
if (r.exitStatus == ExitStatus.IN_PROGRESS) { if (r.exitStatus == ExitStatus.IN_PROGRESS) {
done = false; done = false;
} else if (r.exitStatus != ExitStatus.SUCCESS) { } else if (r.exitStatus != ExitStatus.SUCCESS) {
//must be an error statue, return. // must be an error statue, return.
return r.exitStatus.getExitCode(); return r.exitStatus.getExitCode();
}
if (!done) {
Thread.sleep(sleeptime);
}
} else {
LOG.info("Skipping blockpool " + nnc.getBlockpoolID());
} }
} }
if (!done) {
Thread.sleep(sleeptime);
}
} }
} finally { } finally {
for(NameNodeConnector nnc : connectors) { for(NameNodeConnector nnc : connectors) {
@ -700,12 +707,12 @@ public class Balancer {
} }
static class Parameters { static class Parameters {
static final Parameters DEFAULT = new Parameters( static final Parameters DEFAULT =
BalancingPolicy.Node.INSTANCE, 10.0, new Parameters(BalancingPolicy.Node.INSTANCE, 10.0,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String>emptySet(), Collections.<String>emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
Collections.<String>emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
false); false);
final BalancingPolicy policy; final BalancingPolicy policy;
final double threshold; final double threshold;
@ -718,6 +725,10 @@ public class Balancer {
* otherwise, use only these nodes as source nodes. * otherwise, use only these nodes as source nodes.
*/ */
final Set<String> sourceNodes; final Set<String> sourceNodes;
/**
* A set of block pools to run the balancer on.
*/
final Set<String> blockpools;
/** /**
* Whether to run the balancer during upgrade. * Whether to run the balancer during upgrade.
*/ */
@ -725,13 +736,15 @@ public class Balancer {
Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration, Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
Set<String> excludedNodes, Set<String> includedNodes, Set<String> excludedNodes, Set<String> includedNodes,
Set<String> sourceNodes, boolean runDuringUpgrade) { Set<String> sourceNodes, Set<String> blockpools,
boolean runDuringUpgrade) {
this.policy = policy; this.policy = policy;
this.threshold = threshold; this.threshold = threshold;
this.maxIdleIteration = maxIdleIteration; this.maxIdleIteration = maxIdleIteration;
this.excludedNodes = excludedNodes; this.excludedNodes = excludedNodes;
this.includedNodes = includedNodes; this.includedNodes = includedNodes;
this.sourceNodes = sourceNodes; this.sourceNodes = sourceNodes;
this.blockpools = blockpools;
this.runDuringUpgrade = runDuringUpgrade; this.runDuringUpgrade = runDuringUpgrade;
} }
@ -743,10 +756,11 @@ public class Balancer {
+ " #excluded nodes = %s," + " #excluded nodes = %s,"
+ " #included nodes = %s," + " #included nodes = %s,"
+ " #source nodes = %s," + " #source nodes = %s,"
+ " #blockpools = %s,"
+ " run during upgrade = %s]", + " run during upgrade = %s]",
Balancer.class.getSimpleName(), getClass().getSimpleName(), Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
policy, threshold, maxIdleIteration, threshold, maxIdleIteration, excludedNodes.size(),
excludedNodes.size(), includedNodes.size(), sourceNodes.size(), includedNodes.size(), sourceNodes.size(), blockpools.size(),
runDuringUpgrade); runDuringUpgrade);
} }
} }
@ -790,6 +804,7 @@ public class Balancer {
Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes; Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
Set<String> includedNodes = Parameters.DEFAULT.includedNodes; Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes; Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
Set<String> blockpools = Parameters.DEFAULT.blockpools;
boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade; boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
if (args != null) { if (args != null) {
@ -829,6 +844,14 @@ public class Balancer {
} else if ("-source".equalsIgnoreCase(args[i])) { } else if ("-source".equalsIgnoreCase(args[i])) {
sourceNodes = new HashSet<>(); sourceNodes = new HashSet<>();
i = processHostList(args, i, "source", sourceNodes); i = processHostList(args, i, "source", sourceNodes);
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
checkArgument(
++i < args.length,
"blockpools value is missing: args = "
+ Arrays.toString(args));
blockpools = parseBlockPoolList(args[i]);
LOG.info("Balancer will run on the following blockpools: "
+ blockpools.toString());
} else if ("-idleiterations".equalsIgnoreCase(args[i])) { } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
checkArgument(++i < args.length, checkArgument(++i < args.length,
"idleiterations value is missing: args = " + Arrays "idleiterations value is missing: args = " + Arrays
@ -854,8 +877,8 @@ public class Balancer {
} }
} }
return new Parameters(policy, threshold, maxIdleIteration, return new Parameters(policy, threshold, maxIdleIteration, excludedNodes,
excludedNodes, includedNodes, sourceNodes, runDuringUpgrade); includedNodes, sourceNodes, blockpools, runDuringUpgrade);
} }
private static int processHostList(String[] args, int i, String type, private static int processHostList(String[] args, int i, String type,
@ -882,6 +905,11 @@ public class Balancer {
return i; return i;
} }
private static Set<String> parseBlockPoolList(String string) {
String[] addrs = StringUtils.getTrimmedStrings(string);
return new HashSet<String>(Arrays.asList(addrs));
}
private static void printUsage(PrintStream out) { private static void printUsage(PrintStream out) {
out.println(USAGE + "\n"); out.println(USAGE + "\n");
} }

View File

@ -265,6 +265,7 @@ Usage:
[-policy <policy>] [-policy <policy>]
[-exclude [-f <hosts-file> | <comma-separated list of hosts>]] [-exclude [-f <hosts-file> | <comma-separated list of hosts>]]
[-include [-f <hosts-file> | <comma-separated list of hosts>]] [-include [-f <hosts-file> | <comma-separated list of hosts>]]
[-blockpools <comma-separated list of blockpool ids>]
[-idleiterations <idleiterations>] [-idleiterations <idleiterations>]
| COMMAND\_OPTION | Description | | COMMAND\_OPTION | Description |
@ -273,6 +274,7 @@ Usage:
| `-threshold` \<threshold\> | Percentage of disk capacity. This overwrites the default threshold. | | `-threshold` \<threshold\> | Percentage of disk capacity. This overwrites the default threshold. |
| `-exclude -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes from being balanced by the balancer. | | `-exclude -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes the specified datanodes from being balanced by the balancer. |
| `-include -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Includes only the specified datanodes to be balanced by the balancer. | | `-include -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Includes only the specified datanodes to be balanced by the balancer. |
| `-blockpools` \<comma-separated list of blockpool ids\> | The balancer will only run on blockpools included in this list. |
| `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). | | `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit. This overwrites the default idleiterations(5). |
Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details. Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.

View File

@ -644,7 +644,7 @@ public class TestBalancer {
Balancer.Parameters.DEFAULT.maxIdleIteration, Balancer.Parameters.DEFAULT.maxIdleIteration,
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(), nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
Balancer.Parameters.DEFAULT.sourceNodes, Balancer.Parameters.DEFAULT.sourceNodes,
false); Balancer.Parameters.DEFAULT.blockpools, false);
} }
int expectedExcludedNodes = 0; int expectedExcludedNodes = 0;
@ -885,7 +885,7 @@ public class TestBalancer {
Balancer.Parameters.DEFAULT.maxIdleIteration, Balancer.Parameters.DEFAULT.maxIdleIteration,
datanodes, Balancer.Parameters.DEFAULT.includedNodes, datanodes, Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes, Balancer.Parameters.DEFAULT.sourceNodes,
false); Balancer.Parameters.DEFAULT.blockpools, false);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
assertEquals(ExitStatus.SUCCESS.getExitCode(), r); assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
} finally { } finally {
@ -1080,6 +1080,34 @@ public class TestBalancer {
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
parameters = new String[] { "-blockpools" };
try {
Balancer.Cli.parse(parameters);
fail("IllegalArgumentException is expected when a value "
+ "is not specified for the blockpool flag");
} catch (IllegalArgumentException e) {
}
}
@Test
public void testBalancerCliParseBlockpools() {
String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" };
Balancer.Parameters p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size());
parameters = new String[] { "-blockpools", "bp-1" };
p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size());
parameters = new String[] { "-blockpools", "bp-1,,bp-2" };
p = Balancer.Cli.parse(parameters);
assertEquals(3, p.blockpools.size());
parameters = new String[] { "-blockpools", "bp-1," };
p = Balancer.Cli.parse(parameters);
assertEquals(1, p.blockpools.size());
} }
@ -1387,7 +1415,7 @@ public class TestBalancer {
Parameters.DEFAULT.excludedNodes, Parameters.DEFAULT.excludedNodes,
Parameters.DEFAULT.includedNodes, Parameters.DEFAULT.includedNodes,
Parameters.DEFAULT.sourceNodes, Parameters.DEFAULT.sourceNodes,
true); Balancer.Parameters.DEFAULT.blockpools, true);
assertEquals(ExitStatus.SUCCESS.getExitCode(), assertEquals(ExitStatus.SUCCESS.getExitCode(),
Balancer.run(namenodes, runDuringUpgrade, conf)); Balancer.run(namenodes, runDuringUpgrade, conf));
@ -1590,7 +1618,8 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1, BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
Collections.<String> emptySet(), false); Collections.<String> emptySet(),
Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1609,7 +1638,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1, BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, false); sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1624,7 +1653,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1, BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, false); sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);
@ -1641,7 +1670,7 @@ public class TestBalancer {
BalancingPolicy.Node.INSTANCE, 1, BalancingPolicy.Node.INSTANCE, 1,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(), Collections.<String> emptySet(),
sourceNodes, false); sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1); conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
final int r = Balancer.run(namenodes, p, conf); final int r = Balancer.run(namenodes, p, conf);

View File

@ -21,8 +21,13 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -60,6 +67,7 @@ public class TestBalancerWithMultipleNameNodes {
private static final long CAPACITY = 500L; private static final long CAPACITY = 500L;
private static final String RACK0 = "/rack0"; private static final String RACK0 = "/rack0";
private static final String RACK1 = "/rack1"; private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";
private static final String FILE_NAME = "/tmp.txt"; private static final String FILE_NAME = "/tmp.txt";
private static final Path FILE_PATH = new Path(FILE_NAME); private static final Path FILE_PATH = new Path(FILE_NAME);
@ -76,16 +84,20 @@ public class TestBalancerWithMultipleNameNodes {
final MiniDFSCluster cluster; final MiniDFSCluster cluster;
final ClientProtocol[] clients; final ClientProtocol[] clients;
final short replication; final short replication;
final Balancer.Parameters parameters;
Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes, Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
Configuration conf) throws IOException { Balancer.Parameters parameters, Configuration conf) throws IOException {
this.conf = conf; this.conf = conf;
this.cluster = cluster; this.cluster = cluster;
clients = new ClientProtocol[nNameNodes]; clients = new ClientProtocol[nNameNodes];
for(int i = 0; i < nNameNodes; i++) { for(int i = 0; i < nNameNodes; i++) {
clients[i] = cluster.getNameNode(i).getRpcServer(); clients[i] = cluster.getNameNode(i).getRpcServer();
} }
replication = (short)Math.max(1, nDataNodes - 1); // hard coding replication factor to 1 so logical and raw HDFS size are
// equal
replication = 1;
this.parameters = parameters;
} }
} }
@ -104,11 +116,9 @@ public class TestBalancerWithMultipleNameNodes {
) throws IOException, InterruptedException, TimeoutException { ) throws IOException, InterruptedException, TimeoutException {
final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][]; final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
for(int n = 0; n < s.clients.length; n++) { for(int n = 0; n < s.clients.length; n++) {
final long fileLen = size/s.replication; createFile(s, n, size);
createFile(s, n, fileLen); final List<LocatedBlock> locatedBlocks =
s.clients[n].getBlockLocations(FILE_NAME, 0, size).getLocatedBlocks();
final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
FILE_NAME, 0, fileLen).getLocatedBlocks();
final int numOfBlocks = locatedBlocks.size(); final int numOfBlocks = locatedBlocks.size();
blocks[n] = new ExtendedBlock[numOfBlocks]; blocks[n] = new ExtendedBlock[numOfBlocks];
@ -151,9 +161,14 @@ public class TestBalancerWithMultipleNameNodes {
wait(s.clients, totalUsed, totalCapacity); wait(s.clients, totalUsed, totalCapacity);
LOG.info("BALANCER 1"); LOG.info("BALANCER 1");
// get storage reports for relevant blockpools so that we can compare
// blockpool usages after balancer has run
Map<Integer, DatanodeStorageReport[]> preBalancerPoolUsages =
getStorageReports(s);
// start rebalancing // start rebalancing
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf); final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); final int r = Balancer.run(namenodes, s.parameters, s.conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r); Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
LOG.info("BALANCER 2"); LOG.info("BALANCER 2");
@ -189,7 +204,7 @@ public class TestBalancerWithMultipleNameNodes {
balanced = true; balanced = true;
for(int d = 0; d < used.length; d++) { for(int d = 0; d < used.length; d++) {
final double p = used[d]*100.0/cap[d]; final double p = used[d]*100.0/cap[d];
balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold; balanced = p <= avg + s.parameters.threshold;
if (!balanced) { if (!balanced) {
if (i % 100 == 0) { if (i % 100 == 0) {
LOG.warn("datanodes " + d + " is not yet balanced: " LOG.warn("datanodes " + d + " is not yet balanced: "
@ -203,6 +218,89 @@ public class TestBalancerWithMultipleNameNodes {
} }
} }
LOG.info("BALANCER 6"); LOG.info("BALANCER 6");
// cluster is balanced, verify that only selected blockpools were touched
Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
getStorageReports(s);
Assert.assertEquals(preBalancerPoolUsages.size(),
postBalancerPoolUsages.size());
for (Map.Entry<Integer, DatanodeStorageReport[]> entry
: preBalancerPoolUsages.entrySet()) {
compareTotalPoolUsage(entry.getValue(),
postBalancerPoolUsages.get(entry.getKey()));
}
}
/**
* Compare the total blockpool usage on each datanode to ensure that nothing
* was balanced.
*
* @param preReports storage reports from pre balancer run
* @param postReports storage reports from post balancer run
*/
private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports,
DatanodeStorageReport[] postReports) {
Assert.assertNotNull(preReports);
Assert.assertNotNull(postReports);
Assert.assertEquals(preReports.length, postReports.length);
for (DatanodeStorageReport preReport : preReports) {
String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid();
for(DatanodeStorageReport postReport : postReports) {
if(postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) {
Assert.assertEquals(getTotalPoolUsage(preReport),
getTotalPoolUsage(postReport));
LOG.info("Comparision of datanode pool usage pre/post balancer run. "
+ "PrePoolUsage: " + getTotalPoolUsage(preReport)
+ ", PostPoolUsage: " + getTotalPoolUsage(postReport));
break;
}
}
}
}
private static long getTotalPoolUsage(DatanodeStorageReport report) {
long usage = 0L;
for (StorageReport sr : report.getStorageReports()) {
usage += sr.getBlockPoolUsed();
}
return usage;
}
/**
* Get the storage reports for all blockpools that were not specified by the
* balancer blockpool parameters. If none were specified then the parameter
* was not set and do not return any reports.
*
* @param s suite for the test
* @return a map of storage reports where the key is the blockpool index
* @throws IOException
*/
private static Map<Integer,
DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
Map<Integer, DatanodeStorageReport[]> reports =
new HashMap<Integer, DatanodeStorageReport[]>();
if (s.parameters.blockpools.size() == 0) {
// the blockpools parameter was not set, so we don't need to track any
// blockpools.
return Collections.emptyMap();
}
for (int i = 0; i < s.clients.length; i++) {
if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i)
.getBlockPoolId())) {
// we want to ensure that blockpools not specified by the balancer
// parameters were left alone. Therefore, if the pool was specified,
// skip it. Note: this code assumes the clients in the suite are ordered
// the same way that they are indexed via cluster#getNamesystem(index).
continue;
} else {
LOG.info("Tracking usage of blockpool id: "
+ s.cluster.getNamesystem(i).getBlockPoolId());
reports.put(i,
s.clients[i].getDatanodeStorageReport(DatanodeReportType.LIVE));
}
}
LOG.info("Tracking " + reports.size()
+ " blockpool(s) for pre/post balancer usage.");
return reports;
} }
private static void sleep(long ms) { private static void sleep(long ms) {
@ -220,25 +318,31 @@ public class TestBalancerWithMultipleNameNodes {
} }
/** /**
* First start a cluster and fill the cluster up to a certain size. * First start a cluster and fill the cluster up to a certain size. Then
* Then redistribute blocks according the required distribution. * redistribute blocks according the required distribution. Finally, balance
* Finally, balance the cluster. * the cluster.
* *
* @param nNameNodes Number of NameNodes * @param nNameNodes Number of NameNodes
* @param distributionPerNN The distribution for each NameNode. * @param nNameNodesToBalance Number of NameNodes to run the balancer on
* @param distributionPerNN The distribution for each NameNode.
* @param capacities Capacities of the datanodes * @param capacities Capacities of the datanodes
* @param racks Rack names * @param racks Rack names
* @param conf Configuration * @param conf Configuration
*/ */
private void unevenDistribution(final int nNameNodes, private void unevenDistribution(final int nNameNodes,
long distributionPerNN[], long capacities[], String[] racks, final int nNameNodesToBalance, long distributionPerNN[],
Configuration conf) throws Exception { long capacities[], String[] racks, Configuration conf) throws Exception {
LOG.info("UNEVEN 0"); LOG.info("UNEVEN 0");
final int nDataNodes = distributionPerNN.length; final int nDataNodes = distributionPerNN.length;
if (capacities.length != nDataNodes || racks.length != nDataNodes) { if (capacities.length != nDataNodes || racks.length != nDataNodes) {
throw new IllegalArgumentException("Array length is not the same"); throw new IllegalArgumentException("Array length is not the same");
} }
if (nNameNodesToBalance > nNameNodes) {
throw new IllegalArgumentException("Number of namenodes to balance is "
+ "greater than the number of namenodes.");
}
// calculate total space that need to be filled // calculate total space that need to be filled
final long usedSpacePerNN = TestBalancer.sum(distributionPerNN); final long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
@ -248,7 +352,7 @@ public class TestBalancerWithMultipleNameNodes {
LOG.info("UNEVEN 1"); LOG.info("UNEVEN 1");
final MiniDFSCluster cluster = new MiniDFSCluster final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(new Configuration(conf)) .Builder(new Configuration(conf))
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
.numDataNodes(nDataNodes) .numDataNodes(nDataNodes)
.racks(racks) .racks(racks)
.simulatedCapacities(capacities) .simulatedCapacities(capacities)
@ -258,7 +362,7 @@ public class TestBalancerWithMultipleNameNodes {
cluster.waitActive(); cluster.waitActive();
DFSTestUtil.setFederatedConfiguration(cluster, conf); DFSTestUtil.setFederatedConfiguration(cluster, conf);
LOG.info("UNEVEN 3"); LOG.info("UNEVEN 3");
final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); final Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf);
blocks = generateBlocks(s, usedSpacePerNN); blocks = generateBlocks(s, usedSpacePerNN);
LOG.info("UNEVEN 4"); LOG.info("UNEVEN 4");
} finally { } finally {
@ -280,7 +384,20 @@ public class TestBalancerWithMultipleNameNodes {
try { try {
cluster.waitActive(); cluster.waitActive();
LOG.info("UNEVEN 12"); LOG.info("UNEVEN 12");
final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); Set<String> blockpools = new HashSet<String>();
for (int i = 0; i < nNameNodesToBalance; i++) {
blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
}
Balancer.Parameters params =
new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy,
Balancer.Parameters.DEFAULT.threshold,
Balancer.Parameters.DEFAULT.maxIdleIteration,
Balancer.Parameters.DEFAULT.excludedNodes,
Balancer.Parameters.DEFAULT.includedNodes,
Balancer.Parameters.DEFAULT.sourceNodes, blockpools,
Balancer.Parameters.DEFAULT.runDuringUpgrade);
final Suite s =
new Suite(cluster, nNameNodes, nDataNodes, params, conf);
for(int n = 0; n < nNameNodes; n++) { for(int n = 0; n < nNameNodes; n++) {
// redistribute blocks // redistribute blocks
final Block[][] blocksDN = TestBalancer.distributeBlocks( final Block[][] blocksDN = TestBalancer.distributeBlocks(
@ -336,7 +453,9 @@ public class TestBalancerWithMultipleNameNodes {
try { try {
cluster.waitActive(); cluster.waitActive();
LOG.info("RUN_TEST 1"); LOG.info("RUN_TEST 1");
final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf); final Suite s =
new Suite(cluster, nNameNodes, nDataNodes,
Balancer.Parameters.DEFAULT, conf);
long totalCapacity = TestBalancer.sum(capacities); long totalCapacity = TestBalancer.sum(capacities);
LOG.info("RUN_TEST 2"); LOG.info("RUN_TEST 2");
@ -378,10 +497,26 @@ public class TestBalancerWithMultipleNameNodes {
@Test @Test
public void testUnevenDistribution() throws Exception { public void testUnevenDistribution() throws Exception {
final Configuration conf = createConf(); final Configuration conf = createConf();
unevenDistribution(2, unevenDistribution(2, 2,
new long[] {30*CAPACITY/100, 5*CAPACITY/100}, new long[] {30*CAPACITY/100, 5*CAPACITY/100},
new long[]{CAPACITY, CAPACITY}, new long[]{CAPACITY, CAPACITY},
new String[] {RACK0, RACK1}, new String[] {RACK0, RACK1},
conf); conf);
} }
@Test
public void testBalancing1OutOf2Blockpools() throws Exception {
final Configuration conf = createConf();
unevenDistribution(2, 1, new long[] { 30 * CAPACITY / 100,
5 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY }, new String[] {
RACK0, RACK1 }, conf);
}
@Test
public void testBalancing2OutOf3Blockpools() throws Exception {
final Configuration conf = createConf();
unevenDistribution(3, 2, new long[] { 30 * CAPACITY / 100,
5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
}
} }