From 8f7c92094dfc3ea692e5f893126b89987fce2006 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Mon, 3 Dec 2012 21:59:36 +0000 Subject: [PATCH] HDFS-4240. For nodegroup-aware block placement, when a node is excluded, the nodes in the same nodegroup should also be excluded. Contributed by Junping Du git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1416691 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../BlockPlacementPolicyDefault.java | 28 ++- .../BlockPlacementPolicyWithNodeGroup.java | 21 +++ .../TestReplicationPolicyWithNodeGroup.java | 174 +++++++++++++++++- 4 files changed, 222 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c4efe58ae2f..9c39f9122a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -271,6 +271,10 @@ Trunk (Unreleased) HDFS-4105. The SPNEGO user for secondary namenode should use the web keytab. (Arpit Gupta via jitendra) + HDFS-4240. For nodegroup-aware block placement, when a node is excluded, + the nodes in the same nodegroup should also be excluded. (Junping Du + via szetszwo) + BREAKDOWN OF HDFS-3077 SUBTASKS HDFS-3077. Quorum-based protocol for reading and writing edit logs. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index f976c996153..8383dc27e8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -152,8 +152,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { List results = new ArrayList(chosenNodes); - for (Node node:chosenNodes) { - excludedNodes.put(node, node); + for (DatanodeDescriptor node:chosenNodes) { + // add localMachine and related nodes to excludedNodes + addToExcludedNodes(node, excludedNodes); adjustExcludedNodes(excludedNodes, node); } @@ -235,7 +236,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { + totalReplicasExpected + "\n" + e.getMessage()); if (avoidStaleNodes) { - // ecxludedNodes now has - initial excludedNodes, any nodes that were + // excludedNodes now has - initial excludedNodes, any nodes that were // chosen and nodes that were tried but were not chosen because they // were stale, decommissioned or for any other reason a node is not // chosen for write. Retry again now not avoiding stale node @@ -273,6 +274,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false, results, avoidStaleNodes)) { results.add(localMachine); + // add localMachine and related nodes to excludedNode + addToExcludedNodes(localMachine, excludedNodes); return localMachine; } } @@ -281,7 +284,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); } - + + /** + * Add localMachine and related nodes to excludedNodes + * for next replica choosing. In sub class, we can add more nodes within + * the same failure domain of localMachine + * @return number of new excluded nodes + */ + protected int addToExcludedNodes(DatanodeDescriptor localMachine, + HashMap excludedNodes) { + Node node = excludedNodes.put(localMachine, localMachine); + return node == null?1:0; + } + /* choose one node from the rack that localMachine is on. * if no such node is available, choose one node from the rack where * a second replica is on. @@ -392,6 +407,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results, avoidStaleNodes)) { results.add(chosenNode); + // add chosenNode and related nodes to excludedNode + addToExcludedNodes(chosenNode, excludedNodes); adjustExcludedNodes(excludedNodes, chosenNode); return chosenNode; } else { @@ -441,6 +458,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { maxNodesPerRack, results, avoidStaleNodes)) { numOfReplicas--; results.add(chosenNode); + // add chosenNode and related nodes to excludedNode + int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes); + numOfAvailableNodes -= newExcludedNodes; adjustExcludedNodes(excludedNodes, chosenNode); } else { badTarget = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index c575fa8e115..643d2b401cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -240,6 +240,27 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau String nodeGroupString = cur.getNetworkLocation(); return NetworkTopology.getFirstHalf(nodeGroupString); } + + /** + * Find other nodes in the same nodegroup of localMachine and add them + * into excludeNodes as replica should not be duplicated for nodes + * within the same nodegroup + * @return number of new excluded nodes + */ + protected int addToExcludedNodes(DatanodeDescriptor localMachine, + HashMap excludedNodes) { + int countOfExcludedNodes = 0; + String nodeGroupScope = localMachine.getNetworkLocation(); + List leafNodes = clusterMap.getLeaves(nodeGroupScope); + for (Node leafNode : leafNodes) { + Node node = excludedNodes.put(leafNode, leafNode); + if (node == null) { + // not a existing node in excludedNodes + countOfExcludedNodes++; + } + } + return countOfExcludedNodes; + } /** * Pick up replica node set for deleting replica as over-replicated. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java index d8efd3a029b..032c2c08396 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyWithNodeGroup.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,6 +46,8 @@ import org.junit.Test; public class TestReplicationPolicyWithNodeGroup { private static final int BLOCK_SIZE = 1024; private static final int NUM_OF_DATANODES = 8; + private static final int NUM_OF_DATANODES_BOUNDARY = 6; + private static final int NUM_OF_DATANODES_MORE_TARGETS = 12; private static final Configuration CONF = new HdfsConfiguration(); private static final NetworkTopology cluster; private static final NameNode namenode; @@ -61,6 +64,32 @@ public class TestReplicationPolicyWithNodeGroup { DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"), DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6") }; + + private final static DatanodeDescriptor dataNodesInBoundaryCase[] = + new DatanodeDescriptor[] { + DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"), + DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"), + DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n1"), + DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r1/n2"), + DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"), + DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n3") + }; + + private final static DatanodeDescriptor dataNodesInMoreTargetsCase[] = + new DatanodeDescriptor[] { + DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/r1/n1"), + DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/r1/n1"), + DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/r1/n2"), + DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/r1/n2"), + DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/r1/n3"), + DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/r1/n3"), + DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/r2/n4"), + DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/r2/n4"), + DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/r2/n5"), + DFSTestUtil.getDatanodeDescriptor("10.10.10.10", "/r2/n5"), + DFSTestUtil.getDatanodeDescriptor("11.11.11.11", "/r2/n6"), + DFSTestUtil.getDatanodeDescriptor("12.12.12.12", "/r2/n6"), + }; private final static DatanodeDescriptor NODE = new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7")); @@ -74,6 +103,12 @@ public class TestReplicationPolicyWithNodeGroup { "org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup"); CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY, "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); + + File baseDir = new File(System.getProperty( + "test.build.data", "build/test/data"), "dfs/"); + CONF.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, + new File(baseDir, "name").getPath()); + DFSTestUtil.formatNameNode(CONF); namenode = new NameNode(CONF); } catch (IOException e) { @@ -97,7 +132,27 @@ public class TestReplicationPolicyWithNodeGroup { 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); } } - + + /** + * Scan the targets list: all targets should be on different NodeGroups. + * Return false if two targets are found on the same NodeGroup. + */ + private static boolean checkTargetsOnDifferentNodeGroup( + DatanodeDescriptor[] targets) { + if(targets.length == 0) + return true; + Set targetSet = new HashSet(); + for(DatanodeDescriptor node:targets) { + String nodeGroup = NetworkTopology.getLastHalf(node.getNetworkLocation()); + if(targetSet.contains(nodeGroup)) { + return false; + } else { + targetSet.add(nodeGroup); + } + } + return true; + } + /** * In this testcase, client is dataNodes[0]. So the 1st replica should be * placed on dataNodes[0], the 2nd replica should be placed on @@ -497,5 +552,122 @@ public class TestReplicationPolicyWithNodeGroup { null, null, (short)1, first, second); assertEquals(chosenNode, dataNodes[5]); } + + /** + * Test replica placement policy in case of boundary topology. + * Rack 2 has only 1 node group & can't be placed with two replicas + * The 1st replica will be placed on writer. + * The 2nd replica should be placed on a different rack + * The 3rd replica should be placed on the same rack with writer, but on a + * different node group. + */ + @Test + public void testChooseTargetsOnBoundaryTopology() throws Exception { + for(int i=0; i(), BLOCK_SIZE); + assertEquals(targets.length, 0); + + targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 1); + + targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 2); + assertFalse(cluster.isOnSameRack(targets[0], targets[1])); + + targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0], + new ArrayList(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(checkTargetsOnDifferentNodeGroup(targets)); + } + + /** + * Test re-replication policy in boundary case. + * Rack 2 has only one node group & the node in this node group is chosen + * Rack 1 has two nodegroups & one of them is chosen. + * Replica policy should choose the node from node group of Rack1 but not the + * same nodegroup with chosen nodes. + */ + @Test + public void testRereplicateOnBoundaryTopology() throws Exception { + for(int i=0; i chosenNodes = new ArrayList(); + chosenNodes.add(dataNodesInBoundaryCase[0]); + chosenNodes.add(dataNodesInBoundaryCase[5]); + DatanodeDescriptor[] targets; + targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0], + chosenNodes, BLOCK_SIZE); + assertFalse(cluster.isOnSameNodeGroup(targets[0], + dataNodesInBoundaryCase[0])); + assertFalse(cluster.isOnSameNodeGroup(targets[0], + dataNodesInBoundaryCase[5])); + assertTrue(checkTargetsOnDifferentNodeGroup(targets)); + } + + /** + * Test replica placement policy in case of targets more than number of + * NodeGroups. + * The 12-nodes cluster only has 6 NodeGroups, but in some cases, like: + * placing submitted job file, there is requirement to choose more (10) + * targets for placing replica. We should test it can return 6 targets. + */ + @Test + public void testChooseMoreTargetsThanNodeGroups() throws Exception { + // Cleanup nodes in previous tests + for(int i=0; i(), BLOCK_SIZE); + assertEquals(targets.length, 3); + assertTrue(checkTargetsOnDifferentNodeGroup(targets)); + + // Test special case -- replica number over node groups. + targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0], + new ArrayList(), BLOCK_SIZE); + assertTrue(checkTargetsOnDifferentNodeGroup(targets)); + // Verify it only can find 6 targets for placing replicas. + assertEquals(targets.length, 6); + } + }