svn merge -c 1416691 from trunk for HDFS-4240. For nodegroup-aware block placement, when a node is excluded, he nodes in the same nodegroup should also be excluded.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9b8d329e65
commit
8dd8c00fbb
|
@ -307,11 +307,15 @@ Release 2.1.0-beta - UNRELEASED
|
|||
HDFS-4865. Remove sub resource warning from httpfs log at startup time.
|
||||
(ywskycn via tucu)
|
||||
|
||||
HDFS-4240. For nodegroup-aware block placement, when a node is excluded,
|
||||
the nodes in the same nodegroup should also be excluded. (Junping Du
|
||||
via szetszwo)
|
||||
|
||||
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.
|
||||
(Colin Patrick McCabe via todd)
|
||||
|
||||
|
||||
HDFS-4354. Create DomainSocket and DomainPeer and associated unit tests.
|
||||
(Colin Patrick McCabe via todd)
|
||||
|
||||
|
|
|
@ -201,8 +201,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
List<DatanodeDescriptor> results =
|
||||
new ArrayList<DatanodeDescriptor>(chosenNodes);
|
||||
for (Node node:chosenNodes) {
|
||||
excludedNodes.put(node, node);
|
||||
for (DatanodeDescriptor node:chosenNodes) {
|
||||
// add localMachine and related nodes to excludedNodes
|
||||
addToExcludedNodes(node, excludedNodes);
|
||||
adjustExcludedNodes(excludedNodes, node);
|
||||
}
|
||||
|
||||
|
@ -339,6 +340,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
|
||||
results, avoidStaleNodes)) {
|
||||
results.add(localMachine);
|
||||
// add localMachine and related nodes to excludedNode
|
||||
addToExcludedNodes(localMachine, excludedNodes);
|
||||
return localMachine;
|
||||
}
|
||||
}
|
||||
|
@ -347,7 +350,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
return chooseLocalRack(localMachine, excludedNodes, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Add <i>localMachine</i> and related nodes to <i>excludedNodes</i>
|
||||
* for next replica choosing. In sub class, we can add more nodes within
|
||||
* the same failure domain of localMachine
|
||||
* @return number of new excluded nodes
|
||||
*/
|
||||
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
|
||||
HashMap<Node, Node> excludedNodes) {
|
||||
Node node = excludedNodes.put(localMachine, localMachine);
|
||||
return node == null?1:0;
|
||||
}
|
||||
|
||||
/* choose one node from the rack that <i>localMachine</i> is on.
|
||||
* if no such node is available, choose one node from the rack where
|
||||
* a second replica is on.
|
||||
|
@ -458,6 +473,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
if (isGoodTarget(chosenNode, blocksize,
|
||||
maxNodesPerRack, results, avoidStaleNodes)) {
|
||||
results.add(chosenNode);
|
||||
// add chosenNode and related nodes to excludedNode
|
||||
addToExcludedNodes(chosenNode, excludedNodes);
|
||||
adjustExcludedNodes(excludedNodes, chosenNode);
|
||||
return chosenNode;
|
||||
} else {
|
||||
|
@ -507,6 +524,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
maxNodesPerRack, results, avoidStaleNodes)) {
|
||||
numOfReplicas--;
|
||||
results.add(chosenNode);
|
||||
// add chosenNode and related nodes to excludedNode
|
||||
int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
|
||||
numOfAvailableNodes -= newExcludedNodes;
|
||||
adjustExcludedNodes(excludedNodes, chosenNode);
|
||||
} else {
|
||||
badTarget = true;
|
||||
|
|
|
@ -254,6 +254,27 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
|
|||
String nodeGroupString = cur.getNetworkLocation();
|
||||
return NetworkTopology.getFirstHalf(nodeGroupString);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find other nodes in the same nodegroup of <i>localMachine</i> and add them
|
||||
* into <i>excludeNodes</i> as replica should not be duplicated for nodes
|
||||
* within the same nodegroup
|
||||
* @return number of new excluded nodes
|
||||
*/
|
||||
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
|
||||
HashMap<Node, Node> excludedNodes) {
|
||||
int countOfExcludedNodes = 0;
|
||||
String nodeGroupScope = localMachine.getNetworkLocation();
|
||||
List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
|
||||
for (Node leafNode : leafNodes) {
|
||||
Node node = excludedNodes.put(leafNode, leafNode);
|
||||
if (node == null) {
|
||||
// not a existing node in excludedNodes
|
||||
countOfExcludedNodes++;
|
||||
}
|
||||
}
|
||||
return countOfExcludedNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pick up replica node set for deleting replica as over-replicated.
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -42,6 +43,8 @@ import org.junit.Test;
|
|||
public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
||||
private static final int BLOCK_SIZE = 1024;
|
||||
private static final int NUM_OF_DATANODES = 8;
|
||||
private static final int NUM_OF_DATANODES_BOUNDARY = 6;
|
||||
private static final int NUM_OF_DATANODES_MORE_TARGETS = 12;
|
||||
private static final Configuration CONF = new HdfsConfiguration();
|
||||
private static final NetworkTopology cluster;
|
||||
private static final NameNode namenode;
|
||||
|
@ -58,6 +61,32 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
|||
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
|
||||
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
|
||||
};
|
||||
|
||||
private final static DatanodeDescriptor dataNodesInBoundaryCase[] =
|
||||
new DatanodeDescriptor[] {
|
||||
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
|
||||
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3")
|
||||
};
|
||||
|
||||
private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] =
|
||||
new DatanodeDescriptor[] {
|
||||
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"),
|
||||
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"),
|
||||
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"),
|
||||
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"),
|
||||
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"),
|
||||
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"),
|
||||
DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"),
|
||||
DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"),
|
||||
DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"),
|
||||
DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"),
|
||||
};
|
||||
|
||||
private final static DatanodeDescriptor NODE =
|
||||
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
|
||||
|
@ -71,6 +100,12 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
|||
"org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
|
||||
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
|
||||
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
|
||||
|
||||
File baseDir = new File(System.getProperty(
|
||||
"test.build.data", "build/test/data"), "dfs/");
|
||||
CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
|
||||
new File(baseDir, "name").getPath());
|
||||
|
||||
DFSTestUtil.formatNameNode(CONF);
|
||||
namenode = new NameNode(CONF);
|
||||
} catch (IOException e) {
|
||||
|
@ -94,7 +129,27 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
|||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Scan the targets list: all targets should be on different NodeGroups.
|
||||
* Return false if two targets are found on the same NodeGroup.
|
||||
*/
|
||||
private static boolean checkTargetsOnDifferentNodeGroup(
|
||||
DatanodeDescriptor[] targets) {
|
||||
if(targets.length == 0)
|
||||
return true;
|
||||
Set<String> targetSet = new HashSet<String>();
|
||||
for(DatanodeDescriptor node:targets) {
|
||||
String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation());
|
||||
if(targetSet.contains(nodeGroup)) {
|
||||
return false;
|
||||
} else {
|
||||
targetSet.add(nodeGroup);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* In this testcase, client is dataNodes[0]. So the 1st replica should be
|
||||
* placed on dataNodes[0], the 2nd replica should be placed on
|
||||
|
@ -486,5 +541,122 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
|
|||
null, null, (short)1, first, second);
|
||||
assertEquals(chosenNode, dataNodes[5]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test replica placement policy in case of boundary topology.
|
||||
* Rack 2 has only 1 node group & can't be placed with two replicas
|
||||
* The 1st replica will be placed on writer.
|
||||
* The 2nd replica should be placed on a different rack
|
||||
* The 3rd replica should be placed on the same rack with writer, but on a
|
||||
* different node group.
|
||||
*/
|
||||
@Test
|
||||
public void testChooseTargetsOnBoundaryTopology() throws Exception {
|
||||
for(int i=0; i<NUM_OF_DATANODES; i++) {
|
||||
cluster.remove(dataNodes[i]);
|
||||
}
|
||||
|
||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
||||
cluster.add(dataNodesInBoundaryCase[i]);
|
||||
}
|
||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
||||
dataNodes[0].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
|
||||
|
||||
dataNodesInBoundaryCase[i].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 0);
|
||||
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 1);
|
||||
|
||||
targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 2);
|
||||
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
|
||||
|
||||
targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test re-replication policy in boundary case.
|
||||
* Rack 2 has only one node group & the node in this node group is chosen
|
||||
* Rack 1 has two nodegroups & one of them is chosen.
|
||||
* Replica policy should choose the node from node group of Rack1 but not the
|
||||
* same nodegroup with chosen nodes.
|
||||
*/
|
||||
@Test
|
||||
public void testRereplicateOnBoundaryTopology() throws Exception {
|
||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
||||
dataNodesInBoundaryCase[i].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||
chosenNodes.add(dataNodesInBoundaryCase[0]);
|
||||
chosenNodes.add(dataNodesInBoundaryCase[5]);
|
||||
DatanodeDescriptor[] targets;
|
||||
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
|
||||
chosenNodes, BLOCK_SIZE);
|
||||
assertFalse(cluster.isOnSameNodeGroup(targets[0],
|
||||
dataNodesInBoundaryCase[0]));
|
||||
assertFalse(cluster.isOnSameNodeGroup(targets[0],
|
||||
dataNodesInBoundaryCase[5]));
|
||||
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test replica placement policy in case of targets more than number of
|
||||
* NodeGroups.
|
||||
* The 12-nodes cluster only has 6 NodeGroups, but in some cases, like:
|
||||
* placing submitted job file, there is requirement to choose more (10)
|
||||
* targets for placing replica. We should test it can return 6 targets.
|
||||
*/
|
||||
@Test
|
||||
public void testChooseMoreTargetsThanNodeGroups() throws Exception {
|
||||
// Cleanup nodes in previous tests
|
||||
for(int i=0; i<NUM_OF_DATANODES_BOUNDARY; i++) {
|
||||
DatanodeDescriptor node = dataNodesInBoundaryCase[i];
|
||||
if (cluster.contains(node)) {
|
||||
cluster.remove(node);
|
||||
}
|
||||
}
|
||||
|
||||
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
|
||||
cluster.add(dataNodesInMoreTargetsCase[i]);
|
||||
}
|
||||
|
||||
for(int i=0; i<NUM_OF_DATANODES_MORE_TARGETS; i++) {
|
||||
dataNodesInMoreTargetsCase[i].updateHeartbeat(
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
|
||||
}
|
||||
|
||||
DatanodeDescriptor[] targets;
|
||||
// Test normal case -- 3 replicas
|
||||
targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertEquals(targets.length, 3);
|
||||
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
|
||||
|
||||
// Test special case -- replica number over node groups.
|
||||
targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
|
||||
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
|
||||
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
|
||||
// Verify it only can find 6 targets for placing replicas.
|
||||
assertEquals(targets.length, 6);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue