HDFS-6250. Fix test failed in TestBalancerWithNodeGroup.testBalancerWithRackLocality (Contributed by Binglin Chang and Chen He)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1595145 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0c5e2f6c73
commit
0fb47dbe4e
|
@ -489,6 +489,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-6400. Cannot execute hdfs oiv_legacy. (Akira AJISAKA via kihwal)
|
HDFS-6400. Cannot execute hdfs oiv_legacy. (Akira AJISAKA via kihwal)
|
||||||
|
|
||||||
|
HDFS-6250. Fix test failed in TestBalancerWithNodeGroup.testBalancerWithRackLocality
|
||||||
|
(Binglin Chang and Chen He via junping_du)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -170,7 +170,7 @@ class NameNodeConnector {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The idea for making sure that there is no more than one balancer
|
/* The idea for making sure that there is no more than one balancer
|
||||||
* running in an HDFS is to create a file in the HDFS, writes the IP address
|
* running in an HDFS is to create a file in the HDFS, writes the hostname
|
||||||
* of the machine on which the balancer is running to the file, but did not
|
* of the machine on which the balancer is running to the file, but did not
|
||||||
* close the file until the balancer exits.
|
* close the file until the balancer exits.
|
||||||
* This prevents the second balancer from running because it can not
|
* This prevents the second balancer from running because it can not
|
||||||
|
|
|
@ -22,8 +22,9 @@ import static org.junit.Assert.assertEquals;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -39,6 +40,9 @@ import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
@ -53,7 +57,7 @@ public class TestBalancerWithNodeGroup {
|
||||||
private static final Log LOG = LogFactory.getLog(
|
private static final Log LOG = LogFactory.getLog(
|
||||||
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
|
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");
|
||||||
|
|
||||||
final private static long CAPACITY = 6000L;
|
final private static long CAPACITY = 5000L;
|
||||||
final private static String RACK0 = "/rack0";
|
final private static String RACK0 = "/rack0";
|
||||||
final private static String RACK1 = "/rack1";
|
final private static String RACK1 = "/rack1";
|
||||||
final private static String NODEGROUP0 = "/nodegroup0";
|
final private static String NODEGROUP0 = "/nodegroup0";
|
||||||
|
@ -77,6 +81,7 @@ public class TestBalancerWithNodeGroup {
|
||||||
static Configuration createConf() {
|
static Configuration createConf() {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
TestBalancer.initConf(conf);
|
TestBalancer.initConf(conf);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
|
||||||
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
||||||
NetworkTopologyWithNodeGroup.class.getName());
|
NetworkTopologyWithNodeGroup.class.getName());
|
||||||
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||||
|
@ -191,6 +196,19 @@ public class TestBalancerWithNodeGroup {
|
||||||
LOG.info("Rebalancing with default factor.");
|
LOG.info("Rebalancing with default factor.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Set<ExtendedBlock> getBlocksOnRack(List<LocatedBlock> blks, String rack) {
|
||||||
|
Set<ExtendedBlock> ret = new HashSet<ExtendedBlock>();
|
||||||
|
for (LocatedBlock blk : blks) {
|
||||||
|
for (DatanodeInfo di : blk.getLocations()) {
|
||||||
|
if (rack.equals(NetworkTopology.getFirstHalf(di.getNetworkLocation()))) {
|
||||||
|
ret.add(blk.getBlock());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a cluster with even distribution, and a new empty node is added to
|
* Create a cluster with even distribution, and a new empty node is added to
|
||||||
* the cluster, then test rack locality for balancer policy.
|
* the cluster, then test rack locality for balancer policy.
|
||||||
|
@ -220,9 +238,14 @@ public class TestBalancerWithNodeGroup {
|
||||||
|
|
||||||
// fill up the cluster to be 30% full
|
// fill up the cluster to be 30% full
|
||||||
long totalUsedSpace = totalCapacity * 3 / 10;
|
long totalUsedSpace = totalCapacity * 3 / 10;
|
||||||
TestBalancer.createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
long length = totalUsedSpace / numOfDatanodes;
|
||||||
|
TestBalancer.createFile(cluster, filePath, length,
|
||||||
(short) numOfDatanodes, 0);
|
(short) numOfDatanodes, 0);
|
||||||
|
|
||||||
|
LocatedBlocks lbs = client.getBlockLocations(filePath.toUri().getPath(), 0,
|
||||||
|
length);
|
||||||
|
Set<ExtendedBlock> before = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
|
||||||
|
|
||||||
long newCapacity = CAPACITY;
|
long newCapacity = CAPACITY;
|
||||||
String newRack = RACK1;
|
String newRack = RACK1;
|
||||||
String newNodeGroup = NODEGROUP2;
|
String newNodeGroup = NODEGROUP2;
|
||||||
|
@ -235,22 +258,9 @@ public class TestBalancerWithNodeGroup {
|
||||||
// run balancer and validate results
|
// run balancer and validate results
|
||||||
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
runBalancerCanFinish(conf, totalUsedSpace, totalCapacity);
|
||||||
|
|
||||||
DatanodeInfo[] datanodeReport =
|
lbs = client.getBlockLocations(filePath.toUri().getPath(), 0, length);
|
||||||
client.getDatanodeReport(DatanodeReportType.ALL);
|
Set<ExtendedBlock> after = getBlocksOnRack(lbs.getLocatedBlocks(), RACK0);
|
||||||
|
assertEquals(before, after);
|
||||||
Map<String, Integer> rackToUsedCapacity = new HashMap<String, Integer>();
|
|
||||||
for (DatanodeInfo datanode: datanodeReport) {
|
|
||||||
String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation());
|
|
||||||
int usedCapacity = (int) datanode.getDfsUsed();
|
|
||||||
|
|
||||||
if (rackToUsedCapacity.get(rack) != null) {
|
|
||||||
rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack));
|
|
||||||
} else {
|
|
||||||
rackToUsedCapacity.put(rack, usedCapacity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertEquals(rackToUsedCapacity.size(), 2);
|
|
||||||
assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1));
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue