HDFS-4240. For nodegroup-aware block placement, when a node is excluded, the nodes in the same nodegroup should also be excluded. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1416691 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-12-03 21:59:36 +00:00
parent e5d54ac89a
commit 8f7c92094d
4 changed files with 222 additions and 5 deletions

View File

@ -271,6 +271,10 @@ Trunk (Unreleased)
HDFS-4105. The SPNEGO user for secondary namenode should use the web
keytab. (Arpit Gupta via jitendra)
HDFS-4240. For nodegroup-aware block placement, when a node is excluded,
the nodes in the same nodegroup should also be excluded. (Junping Du
via szetszwo)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.

View File

@ -152,8 +152,9 @@ DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> results =
new ArrayList<DatanodeDescriptor>(chosenNodes);
for (Node node:chosenNodes) {
excludedNodes.put(node, node);
for (DatanodeDescriptor node:chosenNodes) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(node, excludedNodes);
adjustExcludedNodes(excludedNodes, node);
}
@ -235,7 +236,7 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas,
+ totalReplicasExpected + "\n"
+ e.getMessage());
if (avoidStaleNodes) {
// ecxludedNodes now has - initial excludedNodes, any nodes that were
// excludedNodes now has - initial excludedNodes, any nodes that were
// chosen and nodes that were tried but were not chosen because they
// were stale, decommissioned or for any other reason a node is not
// chosen for write. Retry again now not avoiding stale node
@ -273,6 +274,8 @@ protected DatanodeDescriptor chooseLocalNode(
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
// add localMachine and related nodes to excludedNode
addToExcludedNodes(localMachine, excludedNodes);
return localMachine;
}
}
@ -281,7 +284,19 @@ protected DatanodeDescriptor chooseLocalNode(
return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
}
/**
* Add <i>localMachine</i> and related nodes to <i>excludedNodes</i>
* for next replica choosing. In sub class, we can add more nodes within
* the same failure domain of localMachine
* @return number of new excluded nodes
*/
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes) {
Node node = excludedNodes.put(localMachine, localMachine);
return node == null?1:0;
}
/* choose one node from the rack that <i>localMachine</i> is on.
* if no such node is available, choose one node from the rack where
* a second replica is on.
@ -392,6 +407,8 @@ protected DatanodeDescriptor chooseRandom(
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
addToExcludedNodes(chosenNode, excludedNodes);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
} else {
@ -441,6 +458,9 @@ protected void chooseRandom(int numOfReplicas,
maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
numOfAvailableNodes -= newExcludedNodes;
adjustExcludedNodes(excludedNodes, chosenNode);
} else {
badTarget = true;

View File

@ -240,6 +240,27 @@ protected String getRack(final DatanodeInfo cur) {
String nodeGroupString = cur.getNetworkLocation();
return NetworkTopology.getFirstHalf(nodeGroupString);
}
/**
* Find other nodes in the same nodegroup of <i>localMachine</i> and add them
* into <i>excludeNodes</i> as replica should not be duplicated for nodes
* within the same nodegroup
* @return number of new excluded nodes
*/
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes) {
int countOfExcludedNodes = 0;
String nodeGroupScope = localMachine.getNetworkLocation();
List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
for (Node leafNode : leafNodes) {
Node node = excludedNodes.put(leafNode, leafNode);
if (node == null) {
// not a existing node in excludedNodes
countOfExcludedNodes++;
}
}
return countOfExcludedNodes;
}
/**
* Pick up replica node set for deleting replica as over-replicated.

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@ -45,6 +46,8 @@
public class TestReplicationPolicyWithNodeGroup {
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 8;
private static final int NUM_OF_DATANODES_BOUNDARY = 6;
private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
private static final Configuration CONF = new HdfsConfiguration();
private static final NetworkTopology cluster;
private static final NameNode namenode;
@ -61,6 +64,32 @@ public class TestReplicationPolicyWithNodeGroup {
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
};
private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
};
private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
};
private final static DatanodeDescriptor NODE =
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
@ -74,6 +103,12 @@ public class TestReplicationPolicyWithNodeGroup {
"org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
File baseDir = new File(System.getProperty(
"test.build.data", "build/test/data"), "dfs/");
CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
DFSTestUtil.formatNameNode(CONF);
namenode = new NameNode(CONF);
} catch (IOException e) {
@ -97,7 +132,27 @@ private static void setupDataNodeCapacity() {
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
}
/**
* Scan the targets list: all targets should be on different NodeGroups.
* Return false if two targets are found on the same NodeGroup.
*/
private static boolean checkTargetsOnDifferentNodeGroup(
DatanodeDescriptor[] targets) {
if(targets.length == 0)
return true;
Set<String> targetSet = new HashSet<String>();
for(DatanodeDescriptor node:targets) {
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
if(targetSet.contains(nodeGroup)) {
return false;
} else {
targetSet.add(nodeGroup);
}
}
return true;
}
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@ -497,5 +552,122 @@ public void testChooseReplicaToDelete() throws Exception {
null, null, (short)1, first, second);
assertEquals(chosenNode, dataNodes[5]);
}
/**
* Test replica placement policy in case of boundary topology.
* Rack 2 has only 1 node group & can't be placed with two replicas
* The 1st replica will be placed on writer.
* The 2nd replica should be placed on a different rack
* The 3rd replica should be placed on the same rack with writer, but on a
* different node group.
*/
@Test
public void testChooseTargetsOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES; i++) {
cluster.remove(dataNodes[i]);
}
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
cluster.add(dataNodesInBoundaryCase[i]);
}
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
dataNodesInBoundaryCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
}
/**
* Test re-replication policy in boundary case.
* Rack 2 has only one node group & the node in this node group is chosen
* Rack 1 has two nodegroups & one of them is chosen.
* Replica policy should choose the node from node group of Rack1 but not the
* same nodegroup with chosen nodes.
*/
@Test
public void testRereplicateOnBoundaryTopology() throws Exception {
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
dataNodesInBoundaryCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodesInBoundaryCase[0]);
chosenNodes.add(dataNodesInBoundaryCase[5]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
chosenNodes, BLOCK_SIZE);
assertFalse(cluster.isOnSameNodeGroup(targets[0],
dataNodesInBoundaryCase[0]));
assertFalse(cluster.isOnSameNodeGroup(targets[0],
dataNodesInBoundaryCase[5]));
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
}
/**
* Test replica placement policy in case of targets more than number of
* NodeGroups.
* The 12-nodes cluster only has 6 NodeGroups, but in some cases, like:
* placing submitted job file, there is requirement to choose more (10)
* targets for placing replica. We should test it can return 6 targets.
*/
@Test
public void testChooseMoreTargetsThanNodeGroups() throws Exception {
// Cleanup nodes in previous tests
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
DatanodeDescriptor node = dataNodesInBoundaryCase[i];
if (cluster.contains(node)) {
cluster.remove(node);
}
}
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
cluster.add(dataNodesInMoreTargetsCase[i]);
}
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
dataNodesInMoreTargetsCase[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
// Test normal case -- 3 replicas
targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
// Test special case -- replica number over node groups.
targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
// Verify it only can find 6 targets for placing replicas.
assertEquals(targets.length, 6);
}
}