svn merge -c 1357442 from trunk for HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement with 4-layer network topology.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1488845 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-06-03 04:59:48 +00:00
parent 4c734a8ee8
commit f8bb18770b
6 changed files with 860 additions and 6 deletions

View File

@ -171,8 +171,7 @@ public class NetworkTopology {
}
if (parentNode == null) {
// create a new InnerNode
parentNode = new InnerNode(parentName, getPath(this),
this, this.getLevel()+1);
parentNode = createParentNode(parentName);
children.add(parentNode);
}
// add n to the subtree of the next ancestor node
@ -289,7 +288,7 @@ public class NetworkTopology {
// calculate the total number of excluded leaf nodes
int numOfExcludedLeaves =
isLeaf ? 1 : ((InnerNode)excludedNode).getNumOfLeaves();
if (isRack()) { // children are leaves
if (isLeafParent()) { // children are leaves
if (isLeaf) { // excluded node is a leaf node
int excludedIndex = children.indexOf(excludedNode);
if (excludedIndex != -1 && leafIndex >= 0) {
@ -327,6 +326,10 @@ public class NetworkTopology {
}
}
protected boolean isLeafParent() {
return isRack();
}
/**
* Determine if children a leaves, default implementation calls {@link #isRack()}
* <p>To be overridden in subclasses for specific InnerNode implementations,
@ -776,6 +779,30 @@ public class NetworkTopology {
}
return tree.toString();
}
/**
* Divide networklocation string into two parts by last separator, and get
* the first part here.
*
* @param networkLocation
* @return
*/
public static String getFirstHalf(String networkLocation) {
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
return networkLocation.substring(0, index);
}
/**
* Divide networklocation string into two parts by last separator, and get
* the second part here.
*
* @param networkLocation
* @return
*/
public static String getLastHalf(String networkLocation) {
int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
return networkLocation.substring(index);
}
/** swap two array items */
static protected void swap(Node[] nodes, int i, int j) {

View File

@ -49,7 +49,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
}
Node nodeGroup = getNode(node.getNetworkLocation());
if (nodeGroup == null) {
nodeGroup = new InnerNode(node.getNetworkLocation());
nodeGroup = new InnerNodeWithNodeGroup(node.getNetworkLocation());
}
return getNode(nodeGroup.getNetworkLocation());
}
@ -383,6 +383,11 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
}
return true;
}
@Override
protected boolean isLeafParent() {
return isNodeGroup();
}
@Override
protected InnerNode createParentNode(String parentName) {

View File

@ -40,6 +40,9 @@ Release 2.1.0-beta - UNRELEASED
Azure environments. (See breakdown of tasks below for subtasks and
contributors)
HDFS-3601. Add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology. (Junping Du via szetszwo)
IMPROVEMENTS
HDFS-4222. NN is unresponsive and loses heartbeats from DNs when

View File

@ -203,6 +203,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
new ArrayList<DatanodeDescriptor>(chosenNodes);
for (Node node:chosenNodes) {
excludedNodes.put(node, node);
adjustExcludedNodes(excludedNodes, node);
}
if (!clusterMap.contains(writer)) {
@ -452,11 +453,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // choosendNode was not in the excluded list
if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
} else {
badTarget = true;
@ -505,6 +507,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
maxNodesPerRack, results, avoidStaleNodes)) {
numOfReplicas--;
results.add(chosenNode);
adjustExcludedNodes(excludedNodes, chosenNode);
} else {
badTarget = true;
}
@ -522,7 +525,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
throw new NotEnoughReplicasException(detail);
}
}
/**
* After choosing a node to place replica, adjust excluded nodes accordingly.
* It should do nothing here as chosenNode is already put into exlcudeNodes,
* but it can be overridden in subclass to put more related nodes into
* excludedNodes.
*
* @param excludedNodes
* @param chosenNode
*/
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
Node chosenNode) {
// do nothing here.
}
/* judge if a node is a good target.
* return true if <i>node</i> has enough space,
* does not have too much load, and the rack does not have too many nodes

View File

@ -0,0 +1,312 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
/** The class is responsible for choosing the desired number of targets
* for placing block replicas on environment with node-group layer.
* The replica placement strategy is adjusted to:
* If the writer is on a datanode, the 1st replica is placed on the local
* node (or local node-group), otherwise a random datanode.
* The 2nd replica is placed on a datanode that is on a different rack with 1st
* replica node.
* The 3rd replica is placed on a datanode which is on a different node-group
* but the same rack as the second replica node.
*/
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
initialize(conf, stats, clusterMap);
}
BlockPlacementPolicyWithNodeGroup() {
}
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
super.initialize(conf, stats, clusterMap);
}
/** choose local node of localMachine as the target.
* if localMachine is not available, choose a node on the same nodegroup or
* rack instead.
* @return the chosen node
*/
@Override
protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
localMachine.getNetworkLocation());
return localMachine;
}
}
// try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (chosenNode != null) {
return chosenNode;
}
// try a node on local rack
return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
/**
* {@inheritDoc}
*/
@Override
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
Node chosenNode) {
// as node-group aware implementation, it should make sure no two replica
// are placing on the same node group.
addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
}
// add all nodes under specific nodegroup to excludedNodes.
private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
String nodeGroup) {
List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
for (Node node : leafNodes) {
excludedNodes.put(node, node);
}
}
/**
* {@inheritDoc}
*/
@Override
protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local rack, but off-nodegroup
try {
return chooseRandom(NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
if (nextNode != localMachine) {
newLocal = nextNode;
break;
}
}
if (newLocal != null) {
try {
return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results,
avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results,
avoidStaleNodes);
}
}
}
/**
* {@inheritDoc}
*/
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxReplicasPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes);
}
}
/* choose one node from the nodegroup that <i>localMachine</i> is on.
* if no such node is available, choose one node from the nodegroup where
* a second replica is on.
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
int maxNodesPerRack, List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
// choose one from the local node group
try {
return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
if (nextNode != localMachine) {
newLocal = nextNode;
break;
}
}
if (newLocal != null) {
try {
return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
} catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
} else {
//otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes);
}
}
}
@Override
protected String getRack(final DatanodeInfo cur) {
String nodeGroupString = cur.getNetworkLocation();
return NetworkTopology.getFirstHalf(nodeGroupString);
}
/**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes.
* If first is not empty, divide first set into two subsets:
* moreThanOne contains nodes on nodegroup with more than one replica
* exactlyOne contains the remaining nodes in first set
* then pickup priSet if not empty.
* If first is empty, then pick second.
*/
@Override
public Iterator<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
return second.iterator();
}
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
// exactlyOne contains the remaining nodes
Map<String, List<DatanodeDescriptor>> nodeGroupMap =
new HashMap<String, List<DatanodeDescriptor>>();
for(DatanodeDescriptor node : first) {
final String nodeGroupName =
NetworkTopology.getLastHalf(node.getNetworkLocation());
List<DatanodeDescriptor> datanodeList =
nodeGroupMap.get(nodeGroupName);
if (datanodeList == null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
nodeGroupMap.put(nodeGroupName, datanodeList);
}
datanodeList.add(node);
}
final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
// split nodes into two sets
for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) {
if (datanodeList.size() == 1 ) {
// exactlyOne contains nodes on nodegroup with exactly one replica
exactlyOne.add(datanodeList.get(0));
} else {
// moreThanOne contains nodes on nodegroup with more than one replica
moreThanOne.addAll(datanodeList);
}
}
Iterator<DatanodeDescriptor> iter =
moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
return iter;
}
}

View File

@ -0,0 +1,490 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.junit.Test;
public class TestReplicationPolicyWithNodeGroup extends TestCase {
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 8;
private static final Configuration CONF = new HdfsConfiguration();
private static final NetworkTopology cluster;
private static final NameNode namenode;
private static final BlockPlacementPolicy replicator;
private static final String filename = "/dummyfile.txt";
private final static DatanodeDescriptor dataNodes[] = new DatanodeDescriptor[] {
DFSTestUtil.getDatanodeDescriptor("1.1.1.1", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("2.2.2.2", "/d1/r1/n1"),
DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r1/n2"),
DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d1/r2/n3"),
DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d1/r2/n4"),
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r3/n5"),
DFSTestUtil.getDatanodeDescriptor("8.8.8.8", "/d2/r3/n6")
};
private final static DatanodeDescriptor NODE =
new DatanodeDescriptor(DFSTestUtil.getDatanodeDescriptor("9.9.9.9", "/d2/r4/n7"));
static {
try {
FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
// Set properties to make HDFS aware of NodeGroup.
CONF.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
DFSTestUtil.formatNameNode(CONF);
namenode = new NameNode(CONF);
} catch (IOException e) {
e.printStackTrace();
throw (RuntimeException)new RuntimeException().initCause(e);
}
final BlockManager bm = namenode.getNamesystem().getBlockManager();
replicator = bm.getBlockPlacementPolicy();
cluster = bm.getDatanodeManager().getNetworkTopology();
// construct network topology
for(int i=0; i<NUM_OF_DATANODES; i++) {
cluster.add(dataNodes[i]);
}
setupDataNodeCapacity();
}
private static void setupDataNodeCapacity() {
for(int i=0; i<NUM_OF_DATANODES; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
}
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
* different rack and third should be placed on different node (and node group)
* of rack chosen for 2nd node.
* The only excpetion is when the <i>numOfReplicas</i> is 2,
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
* @throws Exception
*/
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
targets = replicator.chooseTarget(filename,
4, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
// Make sure no more than one replicas are on the same nodegroup
verifyNoTwoTargetsOnSameNodeGroup(targets);
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
private void verifyNoTwoTargetsOnSameNodeGroup(DatanodeDescriptor[] targets) {
Set<String> nodeGroupSet = new HashSet<String>();
for (DatanodeDescriptor target: targets) {
nodeGroupSet.add(target.getNetworkLocation());
}
assertEquals(nodeGroupSet.size(), targets.length);
}
/**
* In this testcase, client is dataNodes[0], but the dataNodes[1] is
* not allowed to be chosen. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on a different
* rack, the 3rd should be on same rack as the 2nd replica but in different
* node group, and the rest should be placed on a third rack.
* @throws Exception
*/
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isNodeGroupAware());
// Make sure no replicas are on the same nodegroup
for (int i=1;i<4;i++) {
assertFalse(cluster.isOnSameNodeGroup(targets[0], targets[i]));
}
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
assertFalse(cluster.isOnSameRack(targets[1], targets[3]));
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
chosenNodes.add(dataNodes[2]);
targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
//make sure that the chosen node is in the target.
int i = 0;
for(; i < targets.length && !dataNodes[2].equals(targets[i]); i++);
assertTrue(i < targets.length);
}
/**
* In this testcase, client is dataNodes[0], but dataNodes[0] is not qualified
* to be chosen. So the 1st replica should be placed on dataNodes[1],
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 2nd replica but in different nodegroup,
* and the rest should be placed on the third rack.
* @throws Exception
*/
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
4, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isNodeGroupAware());
verifyNoTwoTargetsOnSameNodeGroup(targets);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
cluster.isOnSameRack(targets[2], targets[3]));
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
/**
* In this testcase, client is dataNodes[0], but none of the nodes on rack 1
* is qualified to be chosen. So the 1st replica should be placed on either
* rack 2 or rack 3.
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 1st replica, but
* in different node group.
* @throws Exception
*/
public void testChooseTarget4() throws Exception {
// make data node 0-2 to be not qualified to choose: not enough disk space
for(int i=0; i<3; i++) {
dataNodes[i].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
}
verifyNoTwoTargetsOnSameNodeGroup(targets);
assertTrue(cluster.isOnSameRack(targets[0], targets[1]) ||
cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
}
/**
* In this testcase, client is is a node outside of file system.
* So the 1st replica can be placed on any node.
* the 2nd replica should be placed on a different rack,
* the 3rd replica should be placed on the same rack as the 2nd replica,
* @throws Exception
*/
public void testChooseTarget5() throws Exception {
setupDataNodeCapacity();
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 1);
targets = replicator.chooseTarget(filename,
2, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
verifyNoTwoTargetsOnSameNodeGroup(targets);
}
/**
* This testcase tests re-replication, when dataNodes[0] is already chosen.
* So the 1st replica can be placed on random rack.
* the 2nd replica should be placed on different node and nodegroup by same rack as
* the 1st replica. The 3rd replica can be placed randomly.
* @throws Exception
*/
public void testRereplicate1() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
}
/**
* This testcase tests re-replication,
* when dataNodes[0] and dataNodes[1] are already chosen.
* So the 1st replica should be placed on a different rack of rack 1.
* the rest replicas can be placed randomly,
* @throws Exception
*/
public void testRereplicate2() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[1]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
cluster.isOnSameRack(dataNodes[0], targets[1]));
}
/**
* This testcase tests re-replication,
* when dataNodes[0] and dataNodes[3] are already chosen.
* So the 1st replica should be placed on the rack that the writer resides.
* the rest replicas can be placed randomly,
* @throws Exception
*/
public void testRereplicate3() throws Exception {
setupDataNodeCapacity();
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
chosenNodes.add(dataNodes[3]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
targets = replicator.chooseTarget(filename,
1, dataNodes[3], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[3], chosenNodes, BLOCK_SIZE);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
}
/**
* Test for the chooseReplicaToDelete are processed based on
* block locality and free space
*/
@Test
public void testChooseReplicaToDelete() throws Exception {
List<DatanodeDescriptor> replicaNodeList =
new ArrayList<DatanodeDescriptor>();
final Map<String, List<DatanodeDescriptor>> rackMap =
new HashMap<String, List<DatanodeDescriptor>>();
dataNodes[0].setRemaining(4*1024*1024);
replicaNodeList.add(dataNodes[0]);
dataNodes[1].setRemaining(3*1024*1024);
replicaNodeList.add(dataNodes[1]);
dataNodes[2].setRemaining(2*1024*1024);
replicaNodeList.add(dataNodes[2]);
dataNodes[5].setRemaining(1*1024*1024);
replicaNodeList.add(dataNodes[5]);
List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
replicator.splitNodesWithRack(
replicaNodeList, rackMap, first, second);
assertEquals(3, first.size());
assertEquals(1, second.size());
DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(
null, null, (short)3, first, second);
// Within first set {dataNodes[0], dataNodes[1], dataNodes[2]},
// dataNodes[0] and dataNodes[1] are in the same nodegroup,
// but dataNodes[1] is chosen as less free space
assertEquals(chosenNode, dataNodes[1]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
assertEquals(2, first.size());
assertEquals(1, second.size());
// Within first set {dataNodes[0], dataNodes[2]}, dataNodes[2] is chosen
// as less free space
chosenNode = replicator.chooseReplicaToDelete(
null, null, (short)2, first, second);
assertEquals(chosenNode, dataNodes[2]);
replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
assertEquals(0, first.size());
assertEquals(2, second.size());
// Within second set, dataNodes[5] with less free space
chosenNode = replicator.chooseReplicaToDelete(
null, null, (short)1, first, second);
assertEquals(chosenNode, dataNodes[5]);
}
}