diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
index 7d983788cbf..1c468d2ad20 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java
@@ -29,6 +29,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.util.ReflectionUtils;
/** The class represents a cluster of computer with a tree hierarchical
* network topology.
@@ -53,6 +56,19 @@ public class NetworkTopology {
super(msg);
}
}
+
+ /**
+ * Get an instance of NetworkTopology based on the value of the configuration
+ * parameter net.topology.impl.
+ *
+ * @param conf the configuration to be used
+ * @return an instance of NetworkTopology
+ */
+ public static NetworkTopology getInstance(Configuration conf){
+ return ReflectionUtils.newInstance(
+ conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+ NetworkTopology.class, NetworkTopology.class), conf);
+ }
/** InnerNode represents a switch/router of a data center or rack.
* Different from a leaf node, it has non-null children.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5b08a7117cd..3202417573b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -43,6 +43,9 @@ Release 2.1.0-beta - UNRELEASED
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology. (Junping Du via szetszwo)
+ HDFS-3495. Update Balancer to support new NetworkTopology with NodeGroup.
+ (Junping Du via szetszwo)
+
IMPROVEMENTS
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 1b6b7cd4a1e..c22b044f21c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -169,7 +169,7 @@ import org.apache.hadoop.util.ToolRunner;
*
* The cluster is balanced. Exiting
* No block can be moved. Exiting...
- * No block has been moved for 3 iterations. Exiting...
+ * No block has been moved for 5 iterations. Exiting...
* Received an IO exception: failure reason. Exiting...
* Another balancer is running. Exiting...
*
@@ -223,7 +223,7 @@ public class Balancer {
private Map datanodes
= new HashMap();
- private NetworkTopology cluster = new NetworkTopology();
+ private NetworkTopology cluster;
final static private int MOVER_THREAD_POOL_SIZE = 1000;
final private ExecutorService moverExecutor =
@@ -250,7 +250,7 @@ public class Balancer {
* Return true if a block and its proxy are chosen; false otherwise
*/
private boolean chooseBlockAndProxy() {
- // iterate all source's blocks until find a good one
+ // iterate all source's blocks until find a good one
for (Iterator blocks=
source.getBlockIterator(); blocks.hasNext();) {
if (markMovedIfGoodBlock(blocks.next())) {
@@ -294,22 +294,35 @@ public class Balancer {
* @return true if a proxy is found; otherwise false
*/
private boolean chooseProxySource() {
- // check if there is replica which is on the same rack with the target
+ final DatanodeInfo targetDN = target.getDatanode();
+ boolean find = false;
for (BalancerDatanode loc : block.getLocations()) {
- if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
- if (loc.addPendingBlock(this)) {
- proxySource = loc;
+ // check if there is replica which is on the same rack with the target
+ if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) {
+ find = true;
+ // if cluster is not nodegroup aware or the proxy is on the same
+ // nodegroup with target, then we already find the nearest proxy
+ if (!cluster.isNodeGroupAware()
+ || cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN)) {
return true;
}
}
- }
- // find out a non-busy replica
- for (BalancerDatanode loc : block.getLocations()) {
- if (loc.addPendingBlock(this)) {
- proxySource = loc;
- return true;
+
+ if (!find) {
+ // find out a non-busy replica out of rack of target
+ find = addTo(loc);
}
}
+
+ return find;
+ }
+
+ // add a BalancerDatanode as proxy source for specific block movement
+ private boolean addTo(BalancerDatanode bdn) {
+ if (bdn.addPendingBlock(this)) {
+ proxySource = bdn;
+ return true;
+ }
return false;
}
@@ -687,7 +700,7 @@ public class Balancer {
NodeTask task = tasks.next();
BalancerDatanode target = task.getDatanode();
PendingBlockMove pendingBlock = new PendingBlockMove();
- if ( target.addPendingBlock(pendingBlock) ) {
+ if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
pendingBlock.source = this;
pendingBlock.target = target;
@@ -788,9 +801,10 @@ public class Balancer {
*/
private static void checkReplicationPolicyCompatibility(Configuration conf
) throws UnsupportedActionException {
- if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() !=
- BlockPlacementPolicyDefault.class) {
- throw new UnsupportedActionException("Balancer without BlockPlacementPolicyDefault");
+ if (BlockPlacementPolicy.getInstance(conf, null, null) instanceof
+ BlockPlacementPolicyDefault) {
+ throw new UnsupportedActionException(
+ "Balancer without BlockPlacementPolicyDefault");
}
}
@@ -805,6 +819,7 @@ public class Balancer {
this.threshold = p.threshold;
this.policy = p.policy;
this.nnc = theblockpool;
+ cluster = NetworkTopology.getInstance(conf);
}
/* Shuffle datanode array */
@@ -915,9 +930,15 @@ public class Balancer {
* Return total number of bytes to move in this iteration
*/
private long chooseNodes() {
- // Match nodes on the same rack first
+ // First, match nodes on the same node group if cluster has nodegroup
+ // awareness
+ if (cluster.isNodeGroupAware()) {
+ chooseNodesOnSameNodeGroup();
+ }
+
+ // Then, match nodes on the same rack
chooseNodes(true);
- // Then match nodes on different racks
+ // At last, match nodes on different racks
chooseNodes(false);
assert (datanodes.size() >= sources.size()+targets.size())
@@ -932,6 +953,102 @@ public class Balancer {
}
return bytesToMove;
}
+
+ /**
+ * Decide all pairs where source and target are
+ * on the same NodeGroup
+ */
+ private void chooseNodesOnSameNodeGroup() {
+
+ /* first step: match each overUtilized datanode (source) to
+ * one or more underUtilized datanodes within same NodeGroup(targets).
+ */
+ chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes);
+
+ /* match each remaining overutilized datanode (source) to below average
+ * utilized datanodes within the same NodeGroup(targets).
+ * Note only overutilized datanodes that haven't had that max bytes to move
+ * satisfied in step 1 are selected
+ */
+ chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes);
+
+ /* match each remaining underutilized datanode to above average utilized
+ * datanodes within the same NodeGroup.
+ * Note only underutilized datanodes that have not had that max bytes to
+ * move satisfied in step 1 are selected.
+ */
+ chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes);
+ }
+
+ /**
+ * Match two sets of nodes within the same NodeGroup, one should be source
+ * nodes (utilization > Avg), and the other should be destination nodes
+ * (utilization < Avg).
+ * @param datanodes
+ * @param candidates
+ */
+ private void
+ chooseOnSameNodeGroup(Collection datanodes, Collection candidates) {
+ for (Iterator i = datanodes.iterator(); i.hasNext();) {
+ final D datanode = i.next();
+ for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); );
+ if (!datanode.isMoveQuotaFull()) {
+ i.remove();
+ }
+ }
+ }
+
+ /**
+ * Match one datanode with a set of candidates nodes within the same NodeGroup.
+ */
+ private boolean chooseOnSameNodeGroup(
+ BalancerDatanode dn, Iterator candidates) {
+ final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates);
+ if (chosen == null) {
+ return false;
+ }
+ if (dn instanceof Source) {
+ matchSourceWithTargetToMove((Source)dn, chosen);
+ } else {
+ matchSourceWithTargetToMove((Source)chosen, dn);
+ }
+ if (!chosen.isMoveQuotaFull()) {
+ candidates.remove();
+ }
+ return true;
+ }
+
+ private void matchSourceWithTargetToMove(
+ Source source, BalancerDatanode target) {
+ long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
+ NodeTask nodeTask = new NodeTask(target, size);
+ source.addNodeTask(nodeTask);
+ target.incScheduledSize(nodeTask.getSize());
+ sources.add(source);
+ targets.add(target);
+ LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
+ +source.datanode.getName() + " to " + target.datanode.getName());
+ }
+
+ /** choose a datanode from candidates
within the same NodeGroup
+ * of dn
.
+ */
+ private T chooseCandidateOnSameNodeGroup(
+ BalancerDatanode dn, Iterator candidates) {
+ if (dn.isMoveQuotaFull()) {
+ for(; candidates.hasNext(); ) {
+ final T c = candidates.next();
+ if (!c.isMoveQuotaFull()) {
+ candidates.remove();
+ continue;
+ }
+ if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
+ return c;
+ }
+ }
+ }
+ return null;
+ }
/* if onRack is true, decide all pairs
* where source and target are on the same rack; Otherwise
@@ -942,33 +1059,33 @@ public class Balancer {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
- chooseTargets(underUtilizedDatanodes.iterator(), onRack);
+ chooseTargets(underUtilizedDatanodes, onRack);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
- chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack);
+ chooseTargets(belowAvgUtilizedDatanodes, onRack);
- /* match each remaining underutilized datanode to
- * above average utilized datanodes.
+ /* match each remaining underutilized datanode (target) to
+ * above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
- chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack);
+ chooseSources(aboveAvgUtilizedDatanodes, onRack);
}
/* choose targets from the target candidate list for each over utilized
* source datanode. OnRackTarget determines if the chosen target
* should be on the same rack as the source
*/
- private void chooseTargets(
- Iterator targetCandidates, boolean onRackTarget ) {
+ private void chooseTargets(
+ Collection targetCandidates, boolean onRackTarget ) {
for (Iterator srcIterator = overUtilizedDatanodes.iterator();
srcIterator.hasNext();) {
Source source = srcIterator.next();
- while (chooseTarget(source, targetCandidates, onRackTarget)) {
+ while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
}
if (!source.isMoveQuotaFull()) {
srcIterator.remove();
@@ -982,11 +1099,11 @@ public class Balancer {
* should be on the same rack as the target
*/
private void chooseSources(
- Iterator sourceCandidates, boolean onRackSource) {
+ Collection sourceCandidates, boolean onRackSource) {
for (Iterator targetIterator =
underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
BalancerDatanode target = targetIterator.next();
- while (chooseSource(target, sourceCandidates, onRackSource)) {
+ while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
}
if (!target.isMoveQuotaFull()) {
targetIterator.remove();
@@ -1026,23 +1143,15 @@ public class Balancer {
}
if (foundTarget) {
assert(target != null):"Choose a null target";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
+ matchSourceWithTargetToMove(source, target);
if (!target.isMoveQuotaFull()) {
targetCandidates.remove();
}
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode + " to " + target.datanode);
return true;
}
return false;
}
-
+
/* For the given target, choose sources from the source candidate list.
* OnRackSource determines if the chosen source
* should be on the same rack as the target
@@ -1074,18 +1183,10 @@ public class Balancer {
}
if (foundSource) {
assert(source != null):"Choose a null source";
- long size = Math.min(source.availableSizeToMove(),
- target.availableSizeToMove());
- NodeTask nodeTask = new NodeTask(target, size);
- source.addNodeTask(nodeTask);
- target.incScheduledSize(nodeTask.getSize());
- sources.add(source);
- targets.add(target);
+ matchSourceWithTargetToMove(source, target);
if ( !source.isMoveQuotaFull()) {
- sourceCandidates.remove();
- }
- LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
- +source.datanode + " to " + target.datanode);
+ sourceCandidates.remove();
+ }
return true;
}
return false;
@@ -1227,6 +1328,10 @@ public class Balancer {
if (block.isLocatedOnDatanode(target)) {
return false;
}
+ if (cluster.isNodeGroupAware() &&
+ isOnSameNodeGroupWithReplicas(target, block, source)) {
+ return false;
+ }
boolean goodBlock = false;
if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
@@ -1258,10 +1363,32 @@ public class Balancer {
}
return goodBlock;
}
-
+
+ /**
+ * Check if there are any replica (other than source) on the same node group
+ * with target. If true, then target is not a good candidate for placing
+ * specific block replica as we don't want 2 replicas under the same nodegroup
+ * after balance.
+ * @param target targetDataNode
+ * @param block dataBlock
+ * @param source sourceDataNode
+ * @return true if there are any replica (other than source) on the same node
+ * group with target
+ */
+ private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target,
+ BalancerBlock block, Source source) {
+ for (BalancerDatanode loc : block.locations) {
+ if (loc != source &&
+ cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/* reset all fields in a balancer preparing for the next iteration */
- private void resetData() {
- this.cluster = new NetworkTopology();
+ private void resetData(Configuration conf) {
+ this.cluster = NetworkTopology.getInstance(conf);
this.overUtilizedDatanodes.clear();
this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear();
@@ -1333,7 +1460,8 @@ public class Balancer {
}
/** Run an iteration for all datanodes. */
- private ReturnStatus run(int iteration, Formatter formatter) {
+ private ReturnStatus run(int iteration, Formatter formatter,
+ Configuration conf) {
try {
/* get all live datanodes of a cluster and their disk usage
* decide the number of bytes need to be moved
@@ -1387,7 +1515,7 @@ public class Balancer {
}
// clean all lists
- resetData();
+ resetData(conf);
return ReturnStatus.IN_PROGRESS;
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
@@ -1435,7 +1563,7 @@ public class Balancer {
Collections.shuffle(connectors);
for(NameNodeConnector nnc : connectors) {
final Balancer b = new Balancer(nnc, p, conf);
- final ReturnStatus r = b.run(iteration, formatter);
+ final ReturnStatus r = b.run(iteration, formatter, conf);
if (r == ReturnStatus.IN_PROGRESS) {
done = false;
} else if (r != ReturnStatus.SUCCESS) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 66a1aee6b8b..d20c50b0e18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -163,11 +162,7 @@ public class DatanodeManager {
this.namesystem = namesystem;
this.blockManager = blockManager;
- Class extends NetworkTopology> networkTopologyClass =
- conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
- NetworkTopology.class, NetworkTopology.class);
- networktopology = (NetworkTopology) ReflectionUtils.newInstance(
- networkTopologyClass, conf);
+ networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 1092d67449f..2d4fbdd8afd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -320,7 +320,7 @@ public class MiniDFSCluster {
/**
* Used by builder to create and return an instance of MiniDFSCluster
*/
- private MiniDFSCluster(Builder builder) throws IOException {
+ protected MiniDFSCluster(Builder builder) throws IOException {
if (builder.nnTopology == null) {
// If no topology is specified, build a single NN.
builder.nnTopology = MiniDFSNNTopology.simpleSingleNN(
@@ -368,7 +368,7 @@ public class MiniDFSCluster {
private Configuration conf;
private NameNodeInfo[] nameNodes;
- private int numDataNodes;
+ protected int numDataNodes;
protected ArrayList dataNodes =
new ArrayList();
private File base_dir;
@@ -2318,7 +2318,7 @@ public class MiniDFSCluster {
return nameNodes[nnIndex].nameNode;
}
- private void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
+ protected void setupDatanodeAddress(Configuration conf, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
if (setupHostsFile) {
String hostsFile = conf.get(DFS_HOSTS, "").trim();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
new file mode 100644
index 00000000000..ff8c92a88a3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter;
+import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
+import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.StaticMapping;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster {
+
+ private static String[] NODE_GROUPS = null;
+ private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class);
+
+ public MiniDFSClusterWithNodeGroup(Builder builder) throws IOException {
+ super(builder);
+ }
+
+ public static void setNodeGroups (String[] nodeGroups) {
+ NODE_GROUPS = nodeGroups;
+ }
+
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] nodeGroups, String[] hosts,
+ long[] simulatedCapacities,
+ boolean setupHostsFile,
+ boolean checkDataNodeAddrConfig,
+ boolean checkDataNodeHostConfig) throws IOException {
+ if (operation == StartupOption.RECOVER) {
+ return;
+ }
+ if (checkDataNodeHostConfig) {
+ conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+ } else {
+ conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+ }
+ conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
+
+ int curDatanodesNum = dataNodes.size();
+ // for mincluster's the default initialDelay for BRs is 0
+ if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
+ }
+ // If minicluster's name node is null assume that the conf has been
+ // set with the right address:port of the name node.
+ //
+ if (racks != null && numDataNodes > racks.length ) {
+ throw new IllegalArgumentException( "The length of racks [" + racks.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+ if (nodeGroups != null && numDataNodes > nodeGroups.length ) {
+ throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+ if (hosts != null && numDataNodes > hosts.length ) {
+ throw new IllegalArgumentException( "The length of hosts [" + hosts.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+ //Generate some hostnames if required
+ if (racks != null && hosts == null) {
+ hosts = new String[numDataNodes];
+ for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) {
+ hosts[i - curDatanodesNum] = "host" + i + ".foo.com";
+ }
+ }
+
+ if (simulatedCapacities != null
+ && numDataNodes > simulatedCapacities.length) {
+ throw new IllegalArgumentException( "The length of simulatedCapacities ["
+ + simulatedCapacities.length
+ + "] is less than the number of datanodes [" + numDataNodes + "].");
+ }
+
+ String [] dnArgs = (operation == null ||
+ operation != StartupOption.ROLLBACK) ?
+ null : new String[] {operation.getName()};
+
+ for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) {
+ Configuration dnConf = new HdfsConfiguration(conf);
+ // Set up datanode address
+ setupDatanodeAddress(dnConf, setupHostsFile, checkDataNodeAddrConfig);
+ if (manageDfsDirs) {
+ File dir1 = getInstanceStorageDir(i, 0);
+ File dir2 = getInstanceStorageDir(i, 1);
+ dir1.mkdirs();
+ dir2.mkdirs();
+ if (!dir1.isDirectory() || !dir2.isDirectory()) {
+ throw new IOException("Mkdirs failed to create directory for DataNode "
+ + i + ": " + dir1 + " or " + dir2);
+ }
+ String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
+ dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+ }
+ if (simulatedCapacities != null) {
+ SimulatedFSDataset.setFactory(dnConf);
+ dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
+ simulatedCapacities[i-curDatanodesNum]);
+ }
+ LOG.info("Starting DataNode " + i + " with "
+ + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ if (hosts != null) {
+ dnConf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, hosts[i - curDatanodesNum]);
+ LOG.info("Starting DataNode " + i + " with hostname set to: "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY));
+ }
+ if (racks != null) {
+ String name = hosts[i - curDatanodesNum];
+ if (nodeGroups == null) {
+ LOG.info("Adding node with hostname : " + name + " to rack " +
+ racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]);
+ } else {
+ LOG.info("Adding node with hostname : " + name + " to serverGroup " +
+ nodeGroups[i-curDatanodesNum] + " and rack " +
+ racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] +
+ nodeGroups[i-curDatanodesNum]);
+ }
+ }
+ Configuration newconf = new HdfsConfiguration(dnConf); // save config
+ if (hosts != null) {
+ NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost");
+ }
+
+ SecureResources secureResources = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ SSLFactory sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, dnConf);
+ try {
+ secureResources = SecureDataNodeStarter.getSecureResources(sslFactory, dnConf);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf, secureResources);
+ if(dn == null)
+ throw new IOException("Cannot start DataNode in "
+ + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+ //since the HDFS does things based on IP:port, we need to add the mapping
+ //for IP:port to rackId
+ String ipAddr = dn.getXferAddress().getAddress().getHostAddress();
+ if (racks != null) {
+ int port = dn.getXferAddress().getPort();
+ if (nodeGroups == null) {
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port +
+ " to rack " + racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(ipAddr + ":" + port,
+ racks[i-curDatanodesNum]);
+ } else {
+ LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " +
+ nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]);
+ StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] +
+ nodeGroups[i-curDatanodesNum]);
+ }
+ }
+ dn.runDatanodeDaemon();
+ dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs, secureResources));
+ }
+ curDatanodesNum += numDataNodes;
+ this.numDataNodes += numDataNodes;
+ waitActive();
+ }
+
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] nodeGroups, String[] hosts,
+ long[] simulatedCapacities,
+ boolean setupHostsFile) throws IOException {
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
+ hosts, simulatedCapacities, setupHostsFile, false, false);
+ }
+
+ public void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, long[] simulatedCapacities,
+ String[] nodeGroups) throws IOException {
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups,
+ null, simulatedCapacities, false);
+ }
+
+ // This is for initialize from parent class.
+ @Override
+ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
+ boolean manageDfsDirs, StartupOption operation,
+ String[] racks, String[] hosts,
+ long[] simulatedCapacities,
+ boolean setupHostsFile,
+ boolean checkDataNodeAddrConfig,
+ boolean checkDataNodeHostConfig) throws IOException {
+ startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks,
+ NODE_GROUPS, hosts, simulatedCapacities, setupHostsFile,
+ checkDataNodeAddrConfig, checkDataNodeHostConfig);
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
new file mode 100644
index 00000000000..33e4fa82b29
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
+import org.apache.hadoop.hdfs.NameNodeProxies;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.net.NetworkTopology;
+import org.junit.Test;
+
+/**
+ * This class tests if a balancer schedules tasks correctly.
+ */
+public class TestBalancerWithNodeGroup extends TestCase {
+ private static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
+
+ final private static long CAPACITY = 500L;
+ final private static String RACK0 = "/rack0";
+ final private static String RACK1 = "/rack1";
+ final private static String NODEGROUP0 = "/nodegroup0";
+ final private static String NODEGROUP1 = "/nodegroup1";
+ final private static String NODEGROUP2 = "/nodegroup2";
+ final static private String fileName = "/tmp.txt";
+ final static private Path filePath = new Path(fileName);
+ MiniDFSClusterWithNodeGroup cluster;
+
+ ClientProtocol client;
+
+ static final long TIMEOUT = 20000L; //msec
+ static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5%
+ static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta
+ static final int DEFAULT_BLOCK_SIZE = 10;
+
+ static {
+ Balancer.setBlockMoveWaitTime(1000L) ;
+ }
+
+ static Configuration createConf() {
+ Configuration conf = new HdfsConfiguration();
+ TestBalancer.initConf(conf);
+ conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
+ "org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
+ conf.set("dfs.block.replicator.classname",
+ "org.apache.hadoop.hdfs.server.blockmanagement." +
+ "BlockPlacementPolicyWithNodeGroup");
+ return conf;
+ }
+
+ /**
+ * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE,
+ * summed over all nodes. Times out after TIMEOUT msec.
+ * @param expectedUsedSpace
+ * @param expectedTotalSpace
+ * @throws IOException - if getStats() fails
+ * @throws TimeoutException
+ */
+ private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+
+ while (true) {
+ long[] status = client.getStats();
+ double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace)
+ / expectedTotalSpace;
+ double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace)
+ / expectedUsedSpace;
+ if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE
+ && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+ break; //done
+
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException("Cluster failed to reached expected values of "
+ + "totalSpace (current: " + status[0]
+ + ", expected: " + expectedTotalSpace
+ + "), or usedSpace (current: " + status[1]
+ + ", expected: " + expectedUsedSpace
+ + "), in more than " + timeout + " msec.");
+ }
+ try {
+ Thread.sleep(100L);
+ } catch(InterruptedException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Wait until balanced: each datanode gives utilization within
+ * BALANCE_ALLOWED_VARIANCE of average
+ * @throws IOException
+ * @throws TimeoutException
+ */
+ private void waitForBalancer(long totalUsedSpace, long totalCapacity)
+ throws IOException, TimeoutException {
+ long timeout = TIMEOUT;
+ long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+ : System.currentTimeMillis() + timeout;
+ final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+ boolean balanced;
+ do {
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+ assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+ balanced = true;
+ for (DatanodeInfo datanode : datanodeReport) {
+ double nodeUtilization = ((double)datanode.getDfsUsed())
+ / datanode.getCapacity();
+ if (Math.abs(avgUtilization - nodeUtilization) >
+ BALANCE_ALLOWED_VARIANCE) {
+ balanced = false;
+ if (System.currentTimeMillis() > failtime) {
+ throw new TimeoutException(
+ "Rebalancing expected avg utilization to become "
+ + avgUtilization + ", but on datanode " + datanode
+ + " it remains at " + nodeUtilization
+ + " after more than " + TIMEOUT + " msec.");
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ break;
+ }
+ }
+ } while (!balanced);
+ }
+
+ private void runBalancer(Configuration conf,
+ long totalUsedSpace, long totalCapacity) throws Exception {
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+
+ // start rebalancing
+ Collection namenodes = DFSUtil.getNsServiceRpcUris(conf);
+ final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
+ assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+
+ waitForHeartBeat(totalUsedSpace, totalCapacity);
+ LOG.info("Rebalancing with default factor.");
+ waitForBalancer(totalUsedSpace, totalCapacity);
+ }
+
+ /**
+ * Create a cluster with even distribution, and a new empty node is added to
+ * the cluster, then test rack locality for balancer policy.
+ */
+ @Test
+ public void testBalancerWithRackLocality() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(capacities.length)
+ .racks(racks)
+ .simulatedCapacities(capacities);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(builder);
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+
+ long totalCapacity = TestBalancer.sum(capacities);
+
+ // fill up the cluster to be 30% full
+ long totalUsedSpace = totalCapacity * 3 / 10;
+ TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
+ (short) numOfDatanodes, 0);
+
+ long newCapacity = CAPACITY;
+ String newRack = RACK1;
+ String newNodeGroup = NODEGROUP2;
+ // start up an empty node with the same capacity and on the same rack
+ cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+ new long[] {newCapacity}, new String[]{newNodeGroup});
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancer(conf, totalUsedSpace, totalCapacity);
+
+ DatanodeInfo[] datanodeReport =
+ client.getDatanodeReport(DatanodeReportType.ALL);
+
+ Map rackToUsedCapacity = new HashMap();
+ for (DatanodeInfo datanode: datanodeReport) {
+ String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
+ int usedCapacity = (int) datanode.getDfsUsed();
+
+ if (rackToUsedCapacity.get(rack) != null) {
+ rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
+ } else {
+ rackToUsedCapacity.put(rack, usedCapacity);
+ }
+ }
+ assertEquals(rackToUsedCapacity.size(), 2);
+ assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Create a cluster with even distribution, and a new empty node is added to
+ * the cluster, then test node-group locality for balancer policy.
+ */
+ @Test
+ public void testBalancerWithNodeGroup() throws Exception {
+ Configuration conf = createConf();
+ long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY};
+ String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1};
+ String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2};
+
+ int numOfDatanodes = capacities.length;
+ assertEquals(numOfDatanodes, racks.length);
+ assertEquals(numOfDatanodes, nodeGroups.length);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(capacities.length)
+ .racks(racks)
+ .simulatedCapacities(capacities);
+ MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups);
+ cluster = new MiniDFSClusterWithNodeGroup(builder);
+ try {
+ cluster.waitActive();
+ client = NameNodeProxies.createProxy(conf,
+ cluster.getFileSystem(0).getUri(),
+ ClientProtocol.class).getProxy();
+
+ long totalCapacity = TestBalancer.sum(capacities);
+ // fill up the cluster to be 20% full
+ long totalUsedSpace = totalCapacity * 2 / 10;
+ TestBalancer.createFile(cluster, filePath, totalUsedSpace / (numOfDatanodes/2),
+ (short) (numOfDatanodes/2), 0);
+
+ long newCapacity = CAPACITY;
+ String newRack = RACK1;
+ String newNodeGroup = NODEGROUP2;
+ // start up an empty node with the same capacity and on NODEGROUP2
+ cluster.startDataNodes(conf, 1, true, null, new String[]{newRack},
+ new long[] {newCapacity}, new String[]{newNodeGroup});
+
+ totalCapacity += newCapacity;
+
+ // run balancer and validate results
+ runBalancer(conf, totalUsedSpace, totalCapacity);
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}