HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1414874 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-28 19:19:27 +00:00
parent 5caef48947
commit 1634e980af
6 changed files with 711 additions and 66 deletions

View File

@ -17,6 +17,9 @@ Trunk (Unreleased)
reliably storing HDFS edit logs. See dedicated section below for breakdown
of subtasks.
HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
(Junping Du via szetszwo)
IMPROVEMENTS
HDFS-1620. Rename HdfsConstants -> HdfsServerConstants, FSConstants ->

View File

@ -168,7 +168,7 @@ import org.apache.hadoop.util.ToolRunner;
* <ol>
* <li>The cluster is balanced. Exiting
* <li>No block can be moved. Exiting...
* <li>No block has been moved for 3 iterations. Exiting...
* <li>No block has been moved for 5 iterations. Exiting...
* <li>Received an IO exception: failure reason. Exiting...
* <li>Another balancer is running. Exiting...
* </ol>
@ -222,7 +222,7 @@ public class Balancer {
private Map<String, BalancerDatanode> datanodes
= new HashMap<String, BalancerDatanode>();
private NetworkTopology cluster = new NetworkTopology();
private NetworkTopology cluster;
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
@ -293,22 +293,35 @@ public class Balancer {
* @return true if a proxy is found; otherwise false
*/
private boolean chooseProxySource() {
final DatanodeInfo targetDN = target.getDatanode();
boolean find = false;
for (BalancerDatanode loc : block.getLocations()) {
// check if there is replica which is on the same rack with the target
for (BalancerDatanode loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
if (loc.addPendingBlock(this)) {
proxySource = loc;
if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
find = true;
// if cluster is not nodegroup aware or the proxy is on the same
// nodegroup with target, then we already find the nearest proxy
if (!cluster.isNodeGroupAware()
|| cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
return true;
}
}
if (!find) {
// find out a non-busy replica out of rack of target
find = addTo(loc);
}
// find out a non-busy replica
for (BalancerDatanode loc : block.getLocations()) {
if (loc.addPendingBlock(this)) {
proxySource = loc;
}
return find;
}
// add a BalancerDatanode as proxy source for specific block movement
private boolean addTo(BalancerDatanode bdn) {
if (bdn.addPendingBlock(this)) {
proxySource = bdn;
return true;
}
}
return false;
}
@ -787,9 +800,10 @@ public class Balancer {
*/
private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException {
if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
BlockPlacementPolicyDefault.class) {
throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof
BlockPlacementPolicyDefault) {
throw new UnsupportedActionException(
"Balancer without BlockPlacementPolicyDefault");
}
}
@ -804,6 +818,7 @@ public class Balancer {
this.threshold = p.threshold;
this.policy = p.policy;
this.nnc = theblockpool;
cluster = NetworkTopology.getInstance(conf);
}
/* Shuffle datanode array */
@ -914,9 +929,15 @@ public class Balancer {
* Return total number of bytes to move in this iteration
*/
private long chooseNodes() {
// Match nodes on the same rack first
// First, match nodes on the same node group if cluster has nodegroup
// awareness
if (cluster.isNodeGroupAware()) {
chooseNodesOnSameNodeGroup();
}
// Then, match nodes on the same rack
chooseNodes(true);
// Then match nodes on different racks
// At last, match nodes on different racks
chooseNodes(false);
assert (datanodes.size() >= sources.size()+targets.size())
@ -932,6 +953,102 @@ public class Balancer {
return bytesToMove;
}
/**
* Decide all <source, target> pairs where source and target are
* on the same NodeGroup
*/
private void chooseNodesOnSameNodeGroup() {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes within same NodeGroup(targets).
*/
chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
/* match each remaining overutilized datanode (source) to below average
* utilized datanodes within the same NodeGroup(targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
/* match each remaining underutilized datanode to above average utilized
* datanodes within the same NodeGroup.
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
}
/**
* Match two sets of nodes within the same NodeGroup, one should be source
* nodes (utilization > Avg), and the other should be destination nodes
* (utilization < Avg).
* @param datanodes
* @param candidates
*/
private <D extends BalancerDatanode, C extends BalancerDatanode> void
chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) {
for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
final D datanode = i.next();
for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
if (!datanode.isMoveQuotaFull()) {
i.remove();
}
}
}
/**
* Match one datanode with a set of candidates nodes within the same NodeGroup.
*/
private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
BalancerDatanode dn, Iterator<T> candidates) {
final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
if (chosen == null) {
return false;
}
if (dn instanceof Source) {
matchSourceWithTargetToMove((Source)dn, chosen);
} else {
matchSourceWithTargetToMove((Source)chosen, dn);
}
if (!chosen.isMoveQuotaFull()) {
candidates.remove();
}
return true;
}
private void matchSourceWithTargetToMove(
Source source, BalancerDatanode target) {
long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
NodeTask nodeTask = new NodeTask(target, size);
source.addNodeTask(nodeTask);
target.incScheduledSize(nodeTask.getSize());
sources.add(source);
targets.add(target);
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+source.datanode.getName() + " to " + target.datanode.getName());
}
/** choose a datanode from <code>candidates</code> within the same NodeGroup
* of <code>dn</code>.
*/
private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
BalancerDatanode dn, Iterator<T> candidates) {
if (dn.isMoveQuotaFull()) {
for(; candidates.hasNext(); ) {
final T c = candidates.next();
if (!c.isMoveQuotaFull()) {
candidates.remove();
continue;
}
if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
return c;
}
}
}
return null;
}
/* if onRack is true, decide all <source, target> pairs
* where source and target are on the same rack; Otherwise
* decide all <source, target> pairs where source and target are
@ -941,21 +1058,21 @@ public class Balancer {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
chooseTargets(underUtilizedDatanodes.iterator(), onRack);
chooseTargets(underUtilizedDatanodes, onRack);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
chooseTargets(belowAvgUtilizedDatanodes, onRack);
/* match each remaining underutilized datanode to
* above average utilized datanodes.
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
chooseSources(aboveAvgUtilizedDatanodes, onRack);
}
/* choose targets from the target candidate list for each over utilized
@ -963,11 +1080,11 @@ public class Balancer {
* should be on the same rack as the source
*/
private void chooseTargets(
Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
srcIterator.hasNext();) {
Source source = srcIterator.next();
while (chooseTarget(source, targetCandidates, onRackTarget)) {
while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
}
if (!source.isMoveQuotaFull()) {
srcIterator.remove();
@ -981,11 +1098,11 @@ public class Balancer {
* should be on the same rack as the target
*/
private void chooseSources(
Iterator<Source> sourceCandidates, boolean onRackSource) {
Collection<Source> sourceCandidates, boolean onRackSource) {
for (Iterator<BalancerDatanode> targetIterator =
underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
BalancerDatanode target = targetIterator.next();
while (chooseSource(target, sourceCandidates, onRackSource)) {
while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
}
if (!target.isMoveQuotaFull()) {
targetIterator.remove();
@ -1025,18 +1142,10 @@ public class Balancer {
}
if (foundTarget) {
assert(target != null):"Choose a null target";
long size = Math.min(source.availableSizeToMove(),
target.availableSizeToMove());
NodeTask nodeTask = new NodeTask(target, size);
source.addNodeTask(nodeTask);
target.incScheduledSize(nodeTask.getSize());
sources.add(source);
targets.add(target);
matchSourceWithTargetToMove(source, target);
if (!target.isMoveQuotaFull()) {
targetCandidates.remove();
}
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+source.datanode + " to " + target.datanode);
return true;
}
return false;
@ -1073,18 +1182,10 @@ public class Balancer {
}
if (foundSource) {
assert(source != null):"Choose a null source";
long size = Math.min(source.availableSizeToMove(),
target.availableSizeToMove());
NodeTask nodeTask = new NodeTask(target, size);
source.addNodeTask(nodeTask);
target.incScheduledSize(nodeTask.getSize());
sources.add(source);
targets.add(target);
matchSourceWithTargetToMove(source, target);
if ( !source.isMoveQuotaFull()) {
sourceCandidates.remove();
}
LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+source.datanode + " to " + target.datanode);
return true;
}
return false;
@ -1226,6 +1327,10 @@ public class Balancer {
if (block.isLocatedOnDatanode(target)) {
return false;
}
if (cluster.isNodeGroupAware() &&
isOnSameNodeGroupWithReplicas(target, block, source)) {
return false;
}
boolean goodBlock = false;
if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@ -1258,9 +1363,31 @@ public class Balancer {
return goodBlock;
}
/**
* Check if there are any replica (other than source) on the same node group
* with target. If true, then target is not a good candidate for placing
* specific block replica as we don't want 2 replicas under the same nodegroup
* after balance.
* @param target targetDataNode
* @param block dataBlock
* @param source sourceDataNode
* @return true if there are any replica (other than source) on the same node
* group with target
*/
private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
BalancerBlock block, Source source) {
for (BalancerDatanode loc : block.locations) {
if (loc != source &&
cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
return true;
}
}
return false;
}
/* reset all fields in a balancer preparing for the next iteration */
private void resetData() {
this.cluster = new NetworkTopology();
private void resetData(Configuration conf) {
this.cluster = NetworkTopology.getInstance(conf);
this.overUtilizedDatanodes.clear();
this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear();
@ -1331,7 +1458,8 @@ public class Balancer {
}
/** Run an iteration for all datanodes. */
private ReturnStatus run(int iteration, Formatter formatter) {
private ReturnStatus run(int iteration, Formatter formatter,
Configuration conf) {
try {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
@ -1385,7 +1513,7 @@ public class Balancer {
}
// clean all lists
resetData();
resetData(conf);
return ReturnStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
@ -1433,7 +1561,7 @@ public class Balancer {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
final ReturnStatus r = b.run(iteration, formatter);
final ReturnStatus r = b.run(iteration, formatter, conf);
if (r == ReturnStatus.IN_PROGRESS) {
done = false;
} else if (r != ReturnStatus.SUCCESS) {

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -151,10 +150,7 @@ public class DatanodeManager {
this.namesystem = namesystem;
this.blockManager = blockManager;
Class<? extends NetworkTopology> networkTopologyClass =
conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
NetworkTopology.class, NetworkTopology.class);
networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);

View File

@ -321,7 +321,7 @@ public class MiniDFSCluster {
/**
* Used by builder to create and return an instance of MiniDFSCluster
*/
private MiniDFSCluster(Builder builder) throws IOException {
protected MiniDFSCluster(Builder builder) throws IOException {
if (builder.nnTopology == null) {
// If no topology is specified, build a single NN.
builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@ -369,8 +369,8 @@ public class MiniDFSCluster {
private Configuration conf;
private NameNodeInfo[] nameNodes;
private int numDataNodes;
private ArrayList<DataNodeProperties> dataNodes =
protected int numDataNodes;
protected List<DataNodeProperties> dataNodes =
new ArrayList<DataNodeProperties>();
private File base_dir;
private File data_dir;
@ -2303,7 +2303,7 @@ public class MiniDFSCluster {
return port;
}
private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
if (setupHostsFile) {
String hostsFile = conf.get(DFS_HOSTS, "").trim();

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import java.io.File;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.StaticMapping;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.ssl.SSLFactory;
public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
private static String[] NODE_GROUPS = null;
private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
public MiniDFSClusterWithNodeGroup(Builder builder) throws IOException {
super(builder);
}
public static void setNodeGroups (String[] nodeGroups) {
NODE_GROUPS = nodeGroups;
}
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
if (operation == StartupOption.RECOVER) {
return;
}
if (checkDataNodeHostConfig) {
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
} else {
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
}
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
}
// If minicluster's name node is null assume that the conf has been
// set with the right address:port of the name node.
//
if (racks != null && numDataNodes > racks.length ) {
throw new IllegalArgumentException( "The length of racks [" + racks.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
if (hosts != null && numDataNodes > hosts.length ) {
throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
//Generate some hostnames if required
if (racks != null && hosts == null) {
hosts = new String[numDataNodes];
for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
}
}
if (simulatedCapacities != null
&& numDataNodes > simulatedCapacities.length) {
throw new IllegalArgumentException( "The length of simulatedCapacities ["
+ simulatedCapacities.length
+ "] is less than the number of datanodes [" + numDataNodes + "].");
}
String [] dnArgs = (operation == null ||
operation != StartupOption.ROLLBACK) ?
null : new String[] {operation.getName()};
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
Configuration dnConf = new HdfsConfiguration(conf);
// Set up datanode address
setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
if (manageDfsDirs) {
File dir1 = getInstanceStorageDir(i, 0);
File dir2 = getInstanceStorageDir(i, 1);
dir1.mkdirs();
dir2.mkdirs();
if (!dir1.isDirectory() || !dir2.isDirectory()) {
throw new IOException("Mkdirs failed to create directory for DataNode "
+ i + ": " + dir1 + " or " + dir2);
}
String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
}
if (simulatedCapacities != null) {
SimulatedFSDataset.setFactory(dnConf);
dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
simulatedCapacities[i-curDatanodesNum]);
}
LOG.info("Starting DataNode " + i + " with "
+ DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
if (hosts != null) {
dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]);
LOG.info("Starting DataNode " + i + " with hostname set to: "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY));
}
if (racks != null) {
String name = hosts[i - curDatanodesNum];
if (nodeGroups == null) {
LOG.info("Adding node with hostname : " + name + " to rack " +
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
} else {
LOG.info("Adding node with hostname : " + name + " to serverGroup " +
nodeGroups[i-curDatanodesNum] + " and rack " +
racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] +
nodeGroups[i-curDatanodesNum]);
}
}
Configuration newconf = new HdfsConfiguration(dnConf); // save config
if (hosts != null) {
NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
}
SecureResources secureResources = null;
if (UserGroupInformation.isSecurityEnabled()) {
SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, dnConf);
try {
secureResources = SecureDataNodeStarter.getSecureResources(sslFactory, dnConf);
} catch (Exception ex) {
ex.printStackTrace();
}
}
DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf, secureResources);
if(dn == null)
throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
//since the HDFS does things based on IP:port, we need to add the mapping
//for IP:port to rackId
String ipAddr = dn.getXferAddress().getAddress().getHostAddress();
if (racks != null) {
int port = dn.getXferAddress().getPort();
if (nodeGroups == null) {
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
" to rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port,
racks[i-curDatanodesNum]);
} else {
LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] +
nodeGroups[i-curDatanodesNum]);
}
}
dn.runDatanodeDaemon();
dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources));
}
curDatanodesNum += numDataNodes;
this.numDataNodes += numDataNodes;
waitActive();
}
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] nodeGroups, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
hosts, simulatedCapacities, setupHostsFile, false, false);
}
public void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, long[] simulatedCapacities,
String[] nodeGroups) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
null, simulatedCapacities, false);
}
// This is for initialize from parent class.
@Override
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks,
NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
checkDataNodeAddrConfig, checkDataNodeHostConfig);
}
}

View File

@ -0,0 +1,290 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.net.NetworkTopology;
import org.junit.Test;
/**
* This class tests if a balancer schedules tasks correctly.
*/
public class TestBalancerWithNodeGroup extends TestCase {
private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
final private static long CAPACITY = 500L;
final private static String RACK0 = "/rack0";
final private static String RACK1 = "/rack1";
final private static String NODEGROUP0 = "/nodegroup0";
final private static String NODEGROUP1 = "/nodegroup1";
final private static String NODEGROUP2 = "/nodegroup2";
final static private String fileName = "/tmp.txt";
final static private Path filePath = new Path(fileName);
MiniDFSClusterWithNodeGroup cluster;
ClientProtocol client;
static final long TIMEOUT = 20000L; //msec
static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
static final int DEFAULT_BLOCK_SIZE = 10;
static {
Balancer.setBlockMoveWaitTime(1000L) ;
}
static Configuration createConf() {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithNodeGroup");
return conf;
}
/**
* Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
* summed over all nodes. Times out after TIMEOUT msec.
* @param expectedUsedSpace
* @param expectedTotalSpace
* @throws IOException - if getStats() fails
* @throws TimeoutException
*/
private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: System.currentTimeMillis() + timeout;
while (true) {
long[] status = client.getStats();
double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
/ expectedTotalSpace;
double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
/ expectedUsedSpace;
if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
&& usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
break; //done
if (System.currentTimeMillis() > failtime) {
throw new TimeoutException("Cluster failed to reached expected values of "
+ "totalSpace (current: " + status[0]
+ ", expected: " + expectedTotalSpace
+ "), or usedSpace (current: " + status[1]
+ ", expected: " + expectedUsedSpace
+ "), in more than " + timeout + " msec.");
}
try {
Thread.sleep(100L);
} catch(InterruptedException ignored) {
}
}
}
/**
* Wait until balanced: each datanode gives utilization within
* BALANCE_ALLOWED_VARIANCE of average
* @throws IOException
* @throws TimeoutException
*/
private void waitForBalancer(long totalUsedSpace, long totalCapacity)
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: System.currentTimeMillis() + timeout;
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
boolean balanced;
do {
DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL);
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
balanced = true;
for (DatanodeInfo datanode : datanodeReport) {
double nodeUtilization = ((double)datanode.getDfsUsed())
/ datanode.getCapacity();
if (Math.abs(avgUtilization - nodeUtilization) >
BALANCE_ALLOWED_VARIANCE) {
balanced = false;
if (System.currentTimeMillis() > failtime) {
throw new TimeoutException(
"Rebalancing expected avg utilization to become "
+ avgUtilization + ", but on datanode " + datanode
+ " it remains at " + nodeUtilization
+ " after more than " + TIMEOUT + " msec.");
}
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
break;
}
}
} while (!balanced);
}
private void runBalancer(Configuration conf,
long totalUsedSpace, long totalCapacity) throws Exception {
waitForHeartBeat(totalUsedSpace, totalCapacity);
// start rebalancing
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
waitForHeartBeat(totalUsedSpace, totalCapacity);
LOG.info("Rebalancing with default factor.");
waitForBalancer(totalUsedSpace, totalCapacity);
}
/**
* Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test rack locality for balancer policy.
*/
@Test
public void testBalancerWithRackLocality() throws Exception {
Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY};
String[] racks = new String[]{RACK0, RACK1};
String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.racks(racks)
.simulatedCapacities(capacities);
MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
cluster = new MiniDFSClusterWithNodeGroup(builder);
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
long totalCapacity = TestBalancer.sum(capacities);
// fill up the cluster to be 30% full
long totalUsedSpace = totalCapacity * 3 / 10;
TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
(short) numOfDatanodes, 0);
long newCapacity = CAPACITY;
String newRack = RACK1;
String newNodeGroup = NODEGROUP2;
// start up an empty node with the same capacity and on the same rack
cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
new long[] {newCapacity}, new String[]{newNodeGroup});
totalCapacity += newCapacity;
// run balancer and validate results
runBalancer(conf, totalUsedSpace, totalCapacity);
DatanodeInfo[] datanodeReport =
client.getDatanodeReport(DatanodeReportType.ALL);
Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
for (DatanodeInfo datanode: datanodeReport) {
String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
int usedCapacity = (int) datanode.getDfsUsed();
if (rackToUsedCapacity.get(rack) != null) {
rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
} else {
rackToUsedCapacity.put(rack, usedCapacity);
}
}
assertEquals(rackToUsedCapacity.size(), 2);
assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
} finally {
cluster.shutdown();
}
}
/**
* Create a cluster with even distribution, and a new empty node is added to
* the cluster, then test node-group locality for balancer policy.
*/
@Test
public void testBalancerWithNodeGroup() throws Exception {
Configuration conf = createConf();
long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
int numOfDatanodes = capacities.length;
assertEquals(numOfDatanodes, racks.length);
assertEquals(numOfDatanodes, nodeGroups.length);
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
.numDataNodes(capacities.length)
.racks(racks)
.simulatedCapacities(capacities);
MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
cluster = new MiniDFSClusterWithNodeGroup(builder);
try {
cluster.waitActive();
client = NameNodeProxies.createProxy(conf,
cluster.getFileSystem(0).getUri(),
ClientProtocol.class).getProxy();
long totalCapacity = TestBalancer.sum(capacities);
// fill up the cluster to be 20% full
long totalUsedSpace = totalCapacity * 2 / 10;
TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2),
(short) (numOfDatanodes/2), 0);
long newCapacity = CAPACITY;
String newRack = RACK1;
String newNodeGroup = NODEGROUP2;
// start up an empty node with the same capacity and on NODEGROUP2
cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
new long[] {newCapacity}, new String[]{newNodeGroup});
totalCapacity += newCapacity;
// run balancer and validate results
runBalancer(conf, totalUsedSpace, totalCapacity);
} finally {
cluster.shutdown();
}
}
}