svn merge -c 1523400 from trunk for HDFS-5188. In BlockPlacementPolicy, reduce the number of chooseTarget(..) methods; replace HashMap with Map in parameter declarations and cleanup some related code.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1523401 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-15 04:25:50 +00:00
parent c5ef30971e
commit 9c98bb8175
12 changed files with 339 additions and 505 deletions

View File

@ -37,6 +37,10 @@ Release 2.3.0 - UNRELEASED
HDFS-4096. Add snapshot information to namenode WebUI. (Haohui Mai via
jing9)
HDFS-5188. In BlockPlacementPolicy, reduce the number of chooseTarget(..)
methods; replace HashMap with Map in parameter declarations and cleanup
some related code. (szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
/**
* This class contains constants for configuration keys used
@ -348,6 +349,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";

View File

@ -1258,8 +1258,7 @@ public class BlockManager {
namesystem.writeUnlock();
}
HashMap<Node, Node> excludedNodes
= new HashMap<Node, Node>();
final Map<Node, Node> excludedNodes = new HashMap<Node, Node>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
@ -1271,9 +1270,7 @@ public class BlockManager {
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the block collection itself.
rw.targets = blockplacement.chooseTarget(rw.bc,
rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
excludedNodes, rw.block.getNumBytes());
rw.chooseTargets(blockplacement, excludedNodes);
}
namesystem.writeLock();
@ -3238,6 +3235,13 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
this.priority = priority;
this.targets = null;
}
private void chooseTargets(BlockPlacementPolicy blockplacement,
Map<Node, Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaNodes, false,
excludedNodes, block.getNumBytes());
}
}
/**

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -27,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@ -51,25 +51,6 @@ public abstract class BlockPlacementPolicy {
}
}
/**
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
*
* @param srcPath the file to which this chooseTargets is being invoked.
* @param numOfReplicas additional number of replicas wanted.
* @param writer the writer's machine, null if not in the cluster.
* @param chosenNodes datanodes that have been chosen as targets.
* @param blocksize size of the data to be written.
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
abstract DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
long blocksize);
/**
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
@ -90,34 +71,8 @@ public abstract class BlockPlacementPolicy {
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize);
/**
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* If not, return as many as we can.
* The base implemenatation extracts the pathname of the file from the
* specified srcBC, but this could be a costly operation depending on the
* file system implementation. Concrete implementations of this class should
* override this method to avoid this overhead.
*
* @param srcBC block collection of file for which chooseTarget is invoked.
* @param numOfReplicas additional number of replicas wanted.
* @param writer the writer's machine, null if not in the cluster.
* @param chosenNodes datanodes that have been chosen as targets.
* @param blocksize size of the data to be written.
* @return array of DatanodeDescriptor instances chosen as target
* and sorted as a pipeline.
*/
DatanodeDescriptor[] chooseTarget(BlockCollection srcBC,
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
HashMap<Node, Node> excludedNodes,
long blocksize) {
return chooseTarget(srcBC.getName(), numOfReplicas, writer,
chosenNodes, false, excludedNodes, blocksize);
}
/**
* Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean,
@ -128,7 +83,7 @@ public abstract class BlockPlacementPolicy {
*/
DatanodeDescriptor[] chooseTarget(String src,
int numOfReplicas, DatanodeDescriptor writer,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize, List<DatanodeDescriptor> favoredNodes) {
// This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class
@ -183,7 +138,7 @@ public abstract class BlockPlacementPolicy {
/**
* Get an instance of the configured Block Placement Policy based on the
* value of the configuration paramater dfs.block.replicator.classname.
* the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}.
*
* @param conf the configuration to be used
* @param stats an object that is used to retrieve the load on the cluster
@ -193,12 +148,12 @@ public abstract class BlockPlacementPolicy {
public static BlockPlacementPolicy getInstance(Configuration conf,
FSClusterStats stats,
NetworkTopology clusterMap) {
Class<? extends BlockPlacementPolicy> replicatorClass =
conf.getClass("dfs.block.replicator.classname",
BlockPlacementPolicyDefault.class,
BlockPlacementPolicy.class);
BlockPlacementPolicy replicator = (BlockPlacementPolicy) ReflectionUtils.newInstance(
replicatorClass, conf);
final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass(
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
BlockPlacementPolicy.class);
final BlockPlacementPolicy replicator = ReflectionUtils.newInstance(
replicatorClass, conf);
replicator.initialize(conf, stats, clusterMap);
return replicator;
}

View File

@ -22,8 +22,8 @@ import static org.apache.hadoop.util.Time.now;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@ -57,6 +57,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
"For more information, please enable DEBUG log level on "
+ BlockPlacementPolicy.class.getName();
private static final ThreadLocal<StringBuilder> debugLoggingBuilder
= new ThreadLocal<StringBuilder>() {
@Override
protected StringBuilder initialValue() {
return new StringBuilder();
}
};
protected boolean considerLoad;
private boolean preferLocalNode = true;
protected NetworkTopology clusterMap;
@ -95,40 +103,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
}
protected ThreadLocal<StringBuilder> threadLocalBuilder =
new ThreadLocal<StringBuilder>() {
@Override
protected StringBuilder initialValue() {
return new StringBuilder();
}
};
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
long blocksize) {
return chooseTarget(numOfReplicas, writer, chosenNodes, false,
null, blocksize);
}
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize);
}
@Override
DatanodeDescriptor[] chooseTarget(String src, int numOfReplicas,
DatanodeDescriptor writer, HashMap<Node, Node> excludedNodes,
long blocksize, List<DatanodeDescriptor> favoredNodes) {
DatanodeDescriptor[] chooseTarget(String src,
int numOfReplicas,
DatanodeDescriptor writer,
Map<Node, Node> excludedNodes,
long blocksize,
List<DatanodeDescriptor> favoredNodes) {
try {
if (favoredNodes == null || favoredNodes.size() == 0) {
// Favored nodes not specified, fall back to regular block placement.
@ -137,7 +130,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
excludedNodes, blocksize);
}
HashMap<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
Map<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes);
// Choose favored nodes
@ -181,14 +174,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
/** This is the implementation. */
DatanodeDescriptor[] chooseTarget(int numOfReplicas,
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return new DatanodeDescriptor[0];
return DatanodeDescriptor.EMPTY_ARRAY;
}
if (excludedNodes == null) {
@ -204,7 +197,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
for (DatanodeDescriptor node:chosenNodes) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(node, excludedNodes);
adjustExcludedNodes(excludedNodes, node);
}
if (!clusterMap.contains(writer)) {
@ -239,7 +231,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/* choose <i>numOfReplicas</i> from all data nodes */
private DatanodeDescriptor chooseTarget(int numOfReplicas,
DatanodeDescriptor writer,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
@ -256,7 +248,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
// Keep a copy of original excludedNodes
final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ?
final Map<Node, Node> oldExcludedNodes = avoidStaleNodes ?
new HashMap<Node, Node>(excludedNodes) : null;
try {
if (numOfResults == 0) {
@ -316,19 +308,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return writer;
}
/* choose <i>localMachine</i> as the target.
/**
* Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen node
*/
protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@ -337,11 +329,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
results, avoidStaleNodes)) {
results.add(localMachine);
// add localMachine and related nodes to excludedNode
addToExcludedNodes(localMachine, excludedNodes);
if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localMachine;
}
}
@ -358,26 +347,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return number of new excluded nodes
*/
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes) {
Map<Node, Node> excludedNodes) {
Node node = excludedNodes.put(localMachine, localMachine);
return node == null?1:0;
}
/* choose one node from the rack that <i>localMachine</i> is on.
/**
* Choose one node from the rack that <i>localMachine</i> is on.
* if no such node is available, choose one node from the rack where
* a second replica is on.
* if still no such node is available, choose a random node
* in the cluster.
* @return the chosen node
*/
protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
@ -391,9 +380,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@ -416,7 +403,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
}
/* choose <i>numOfReplicas</i> nodes from the racks
/**
* Choose <i>numOfReplicas</i> nodes from the racks
* that <i>localMachine</i> is NOT on.
* if not enough nodes are available, choose the remaining ones
* from the local rack
@ -424,12 +412,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
Map<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks
try {
@ -443,91 +431,59 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
}
/* Randomly choose one target from <i>nodes</i>.
* @return the chosen node
/**
* Randomly choose one target from the given <i>scope</i>.
* @return the chosen node, if there is any.
*/
protected DatanodeDescriptor chooseRandom(
String nodes,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
StringBuilder builder = null;
if (LOG.isDebugEnabled()) {
builder = threadLocalBuilder.get();
builder.setLength(0);
builder.append("[");
}
boolean badTarget = false;
while(numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) { // chosenNode was not in the excluded list
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
addToExcludedNodes(chosenNode, excludedNodes);
adjustExcludedNodes(excludedNodes, chosenNode);
return chosenNode;
} else {
badTarget = true;
}
}
}
String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) {
if (badTarget && builder != null) {
detail = builder.append("]").toString();
builder.setLength(0);
} else detail = "";
}
throw new NotEnoughReplicasException(detail);
protected DatanodeDescriptor chooseRandom(String scope,
Map<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes);
}
/* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
/**
* Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>.
* @return the first chosen node, if there is any.
*/
protected void chooseRandom(int numOfReplicas,
String nodes,
HashMap<Node, Node> excludedNodes,
protected DatanodeDescriptor chooseRandom(int numOfReplicas,
String scope,
Map<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
throws NotEnoughReplicasException {
int numOfAvailableNodes =
clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
scope, excludedNodes.keySet());
StringBuilder builder = null;
if (LOG.isDebugEnabled()) {
builder = threadLocalBuilder.get();
builder = debugLoggingBuilder.get();
builder.setLength(0);
builder.append("[");
}
boolean badTarget = false;
DatanodeDescriptor firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode =
(DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
(DatanodeDescriptor)clusterMap.chooseRandom(scope);
Node oldNode = excludedNodes.put(chosenNode, chosenNode);
if (oldNode == null) {
numOfAvailableNodes--;
if (isGoodTarget(chosenNode, blocksize,
maxNodesPerRack, results, avoidStaleNodes)) {
int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
blocksize, maxNodesPerRack, considerLoad, results, avoidStaleNodes);
if (newExcludedNodes >= 0) {
numOfReplicas--;
results.add(chosenNode);
// add chosenNode and related nodes to excludedNode
int newExcludedNodes = addToExcludedNodes(chosenNode, excludedNodes);
if (firstChosen == null) {
firstChosen = chosenNode;
}
numOfAvailableNodes -= newExcludedNodes;
adjustExcludedNodes(excludedNodes, chosenNode);
} else {
badTarget = true;
}
@ -544,34 +500,44 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
}
throw new NotEnoughReplicasException(detail);
}
}
/**
* After choosing a node to place replica, adjust excluded nodes accordingly.
* It should do nothing here as chosenNode is already put into exlcudeNodes,
* but it can be overridden in subclass to put more related nodes into
* excludedNodes.
*
* @param excludedNodes
* @param chosenNode
*/
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
Node chosenNode) {
// do nothing here.
return firstChosen;
}
/* judge if a node is a good target.
* return true if <i>node</i> has enough space,
* does not have too much load, and the rack does not have too many nodes
/**
* If the given node is a good target, add it to the result list and
* update the excluded node map.
* @return -1 if the given is not a good target;
* otherwise, return the number of excluded nodes added to the map.
*/
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
results, avoidStaleNodes);
int addIfIsGoodTarget(DatanodeDescriptor node,
Map<Node, Node> excludedNodes,
long blockSize,
int maxNodesPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
results, avoidStaleNodes)) {
results.add(node);
// add node and related nodes to excludedNode
return addToExcludedNodes(node, excludedNodes);
} else {
return -1;
}
}
private static void logNodeIsNotChosen(DatanodeDescriptor node, String reason) {
if (LOG.isDebugEnabled()) {
// build the error message for later use.
debugLoggingBuilder.get()
.append(node).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because ")
.append(reason);
}
}
/**
* Determine if a node is a good target.
*
@ -588,28 +554,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* does not have too much load,
* and the rack does not have too many nodes.
*/
protected boolean isGoodTarget(DatanodeDescriptor node,
private boolean isGoodTarget(DatanodeDescriptor node,
long blockSize, int maxTargetPerRack,
boolean considerLoad,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissed
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
if(LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node is (being) decommissioned ");
}
logNodeIsNotChosen(node, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
if (LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node is stale ");
}
logNodeIsNotChosen(node, "the node is stale ");
return false;
}
}
@ -618,11 +576,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
(node.getBlocksScheduled() * blockSize);
// check the remaining capacity of the target machine
if (blockSize* HdfsConstants.MIN_BLOCKS_FOR_WRITE>remaining) {
if(LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node does not have enough space ");
}
logNodeIsNotChosen(node, "the node does not have enough space ");
return false;
}
@ -634,11 +588,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
avgLoad = (double)stats.getTotalLoad()/size;
}
if (node.getXceiverCount() > (2.0 * avgLoad)) {
if(LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the node is too busy ");
}
logNodeIsNotChosen(node, "the node is too busy ");
return false;
}
}
@ -646,31 +596,25 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
for(Iterator<DatanodeDescriptor> iter = results.iterator();
iter.hasNext();) {
Node result = iter.next();
for(Node result : results) {
if (rackname.equals(result.getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
if(LOG.isDebugEnabled()) {
threadLocalBuilder.get().append(node.toString()).append(": ")
.append("Node ").append(NodeBase.getPath(node))
.append(" is not chosen because the rack has too many chosen nodes ");
}
logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
return false;
}
return true;
}
/* Return a pipeline of nodes.
/**
* Return a pipeline of nodes.
* The pipeline is formed finding a shortest path that
* starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem.
*/
private DatanodeDescriptor[] getPipeline(
DatanodeDescriptor writer,
private DatanodeDescriptor[] getPipeline(DatanodeDescriptor writer,
DatanodeDescriptor[] nodes) {
if (nodes.length==0) return nodes;
@ -709,7 +653,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int minRacks) {
DatanodeInfo[] locs = lBlk.getLocations();
if (locs == null)
locs = new DatanodeInfo[0];
locs = DatanodeDescriptor.EMPTY_ARRAY;
int numRacks = clusterMap.getNumOfRacks();
if(numRacks <= 1) // only one rack
return 0;
@ -724,24 +668,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override
public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc,
Block block,
short replicationFactor,
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
Block block, short replicationFactor,
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
DatanodeDescriptor oldestHeartbeatNode = null;
long minSpace = Long.MAX_VALUE;
DatanodeDescriptor minSpaceNode = null;
// pick replica from the first Set. If first is empty, then pick replicas
// from second set.
Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
while (iter.hasNext() ) {
DatanodeDescriptor node = iter.next();
for(DatanodeDescriptor node : pickupReplicaSet(first, second)) {
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
if(lastHeartbeat < oldestHeartbeat) {
@ -762,12 +700,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
protected Iterator<DatanodeDescriptor> pickupReplicaSet(
protected Collection<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
Iterator<DatanodeDescriptor> iter =
first.isEmpty() ? second.iterator() : first.iterator();
return iter;
return first.isEmpty() ? second : first;
}
@VisibleForTesting

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -63,13 +62,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* @return the chosen node
*/
@Override
protected DatanodeDescriptor chooseLocalNode(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// if no local machine, randomly choose one node
if (localMachine == null)
@ -79,12 +74,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
// otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine);
if (oldNode == null) { // was not in the excluded list
if (isGoodTarget(localMachine, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes)) {
results.add(localMachine);
// Nodes under same nodegroup should be excluded.
addNodeGroupToExcludedNodes(excludedNodes,
localMachine.getNetworkLocation());
if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localMachine;
}
}
@ -105,34 +96,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* {@inheritDoc}
*/
@Override
protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
Node chosenNode) {
// as node-group aware implementation, it should make sure no two replica
// are placing on the same node group.
addNodeGroupToExcludedNodes(excludedNodes, chosenNode.getNetworkLocation());
}
// add all nodes under specific nodegroup to excludedNodes.
private void addNodeGroupToExcludedNodes(HashMap<Node, Node> excludedNodes,
String nodeGroup) {
List<Node> leafNodes = clusterMap.getLeaves(nodeGroup);
for (Node node : leafNodes) {
excludedNodes.put(node, node);
}
}
/**
* {@inheritDoc}
*/
@Override
protected DatanodeDescriptor chooseLocalRack(
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxNodesPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
@ -148,9 +115,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@ -181,13 +146,9 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
*/
@Override
protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes,
long blocksize,
int maxReplicasPerRack,
List<DatanodeDescriptor> results,
boolean avoidStaleNodes)
throws NotEnoughReplicasException {
DatanodeDescriptor localMachine, Map<Node, Node> excludedNodes,
long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf(
@ -210,10 +171,11 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* if still no such node is available, choose a random node in the cluster.
* @return the chosen node
*/
private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize,
int maxNodesPerRack, List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
private DatanodeDescriptor chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException {
// no local machine, so choose a random machine
if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes,
@ -227,9 +189,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
} catch (NotEnoughReplicasException e1) {
// find the second replica
DatanodeDescriptor newLocal=null;
for(Iterator<DatanodeDescriptor> iter=results.iterator();
iter.hasNext();) {
DatanodeDescriptor nextNode = iter.next();
for(DatanodeDescriptor nextNode : results) {
if (nextNode != localMachine) {
newLocal = nextNode;
break;
@ -264,10 +224,11 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* within the same nodegroup
* @return number of new excluded nodes
*/
protected int addToExcludedNodes(DatanodeDescriptor localMachine,
HashMap<Node, Node> excludedNodes) {
@Override
protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
Map<Node, Node> excludedNodes) {
int countOfExcludedNodes = 0;
String nodeGroupScope = localMachine.getNetworkLocation();
String nodeGroupScope = chosenNode.getNetworkLocation();
List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
for (Node leafNode : leafNodes) {
Node node = excludedNodes.put(leafNode, leafNode);
@ -290,12 +251,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* If first is empty, then pick second.
*/
@Override
public Iterator<DatanodeDescriptor> pickupReplicaSet(
public Collection<DatanodeDescriptor> pickupReplicaSet(
Collection<DatanodeDescriptor> first,
Collection<DatanodeDescriptor> second) {
// If no replica within same rack, return directly.
if (first.isEmpty()) {
return second.iterator();
return second;
}
// Split data nodes in the first set into two sets,
// moreThanOne contains nodes on nodegroup with more than one replica
@ -328,9 +289,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
}
}
Iterator<DatanodeDescriptor> iter =
moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
return iter;
return moreThanOne.isEmpty()? exactlyOne : moreThanOne;
}
}

View File

@ -42,7 +42,8 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeDescriptor extends DatanodeInfo {
public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
public DecommissioningStatus decommissioningStatus = new DecommissioningStatus();

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -39,9 +40,11 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.junit.Assert;
import org.junit.Test;
import junit.framework.Assert;
/**
* This class tests if a balancer schedules tasks correctly.
@ -75,10 +78,9 @@ public class TestBalancerWithNodeGroup {
Configuration conf = new HdfsConfiguration();
TestBalancer.initConf(conf);
conf.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
conf.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement." +
"BlockPlacementPolicyWithNodeGroup");
NetworkTopologyWithNodeGroup.class.getName());
conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithNodeGroup.class.getName());
return conf;
}

View File

@ -157,8 +157,8 @@ public class TestRBWBlockInvalidation {
// in the context of the test, whereas a random one is more accurate
// to what is seen in real clusters (nodes have random amounts of free
// space)
conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
BlockPlacementPolicy.class);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
RandomDeleterPolicy.class, BlockPlacementPolicy.class);
// Speed up the test a bit with faster heartbeats.
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);

View File

@ -128,30 +128,25 @@ public class TestReplicationPolicy {
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
targets = replicator.chooseTarget(filename, 4, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@ -163,15 +158,38 @@ public class TestReplicationPolicy {
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>());
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes, Map<Node, Node> excludedNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
}
private static DatanodeDescriptor[] chooseTarget(
BlockPlacementPolicyDefault policy,
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
HashMap<Node, Node> excludedNodes,
long blocksize) {
return policy.chooseTarget(numOfReplicas, writer, chosenNodes, false,
excludedNodes, blocksize);
Map<Node, Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE);
}
/**
@ -186,28 +204,24 @@ public class TestReplicationPolicy {
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = chooseTarget(repl, 0, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes, excludedNodes);
assertEquals(targets.length, 0);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = chooseTarget(repl, 2, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@ -215,8 +229,7 @@ public class TestReplicationPolicy {
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = chooseTarget(repl, 3, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@ -225,8 +238,7 @@ public class TestReplicationPolicy {
excludedNodes.clear();
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = chooseTarget(repl, 4, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
for(int i=1; i<4; i++) {
@ -240,7 +252,7 @@ public class TestReplicationPolicy {
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
chosenNodes.add(dataNodes[2]);
targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
@ -266,30 +278,25 @@ public class TestReplicationPolicy {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
targets = replicator.chooseTarget(filename, 2, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 4, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
for(int i=1; i<4; i++) {
@ -322,23 +329,19 @@ public class TestReplicationPolicy {
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
targets = replicator.chooseTarget(filename, 2, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@ -367,21 +370,17 @@ public class TestReplicationPolicy {
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/d2/r4");
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, writerDesc,
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0, writerDesc);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, writerDesc,
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1, writerDesc);
assertEquals(targets.length, 1);
targets = replicator.chooseTarget(filename, 2, writerDesc,
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2, writerDesc);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, writerDesc,
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3, writerDesc);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@ -425,9 +424,7 @@ public class TestReplicationPolicy {
// try to choose NUM_OF_DATANODES which is more than actually available
// nodes.
DatanodeDescriptor[] targets = replicator.chooseTarget(filename,
NUM_OF_DATANODES, dataNodes[0], new ArrayList<DatanodeDescriptor>(),
BLOCK_SIZE);
DatanodeDescriptor[] targets = chooseTarget(NUM_OF_DATANODES);
assertEquals(targets.length, NUM_OF_DATANODES - 2);
final List<LoggingEvent> log = appender.getLog();
@ -470,17 +467,14 @@ public class TestReplicationPolicy {
DatanodeDescriptor[] targets;
// We set the datanode[0] as stale, thus should choose datanode[1] since
// datanode[1] is on the same rack with datanode[0] (writer)
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
targets = chooseTarget(repl, 1, dataNodes[0], chosenNodes, excludedNodes,
BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
@ -507,33 +501,27 @@ public class TestReplicationPolicy {
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
DatanodeDescriptor[] targets = chooseTarget(0);
assertEquals(targets.length, 0);
// Since we have 6 datanodes total, stale nodes should
// not be returned until we ask for more than 3 targets
targets = replicator.chooseTarget(filename, 1, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 2, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(containsWithinRange(targets[0], dataNodes, 0, 2));
assertFalse(containsWithinRange(targets[1], dataNodes, 0, 2));
targets = replicator.chooseTarget(filename, 3, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertTrue(containsWithinRange(targets[0], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[1], dataNodes, 3, 5));
assertTrue(containsWithinRange(targets[2], dataNodes, 3, 5));
targets = replicator.chooseTarget(filename, 4, dataNodes[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertTrue(containsWithinRange(dataNodes[3], targets, 0, 3));
assertTrue(containsWithinRange(dataNodes[4], targets, 0, 3));
@ -586,7 +574,8 @@ public class TestReplicationPolicy {
BlockPlacementPolicy replicator = miniCluster.getNameNode()
.getNamesystem().getBlockManager().getBlockPlacementPolicy();
DatanodeDescriptor[] targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
@ -610,7 +599,7 @@ public class TestReplicationPolicy {
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), false, null, BLOCK_SIZE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[0], staleNodeInfo));
@ -632,8 +621,7 @@ public class TestReplicationPolicy {
assertTrue(miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().shouldAvoidStaleDataNodesForWrite());
// Call chooseTarget
targets = replicator.chooseTarget(filename, 3,
staleNodeInfo, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3, staleNodeInfo);
assertEquals(targets.length, 3);
assertFalse(cluster.isOnSameRack(targets[0], staleNodeInfo));
} finally {
@ -654,23 +642,19 @@ public class TestReplicationPolicy {
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
@ -690,17 +674,14 @@ public class TestReplicationPolicy {
chosenNodes.add(dataNodes[1]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[1]));
@ -720,29 +701,24 @@ public class TestReplicationPolicy {
chosenNodes.add(dataNodes[2]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[2], targets[0]));
targets = replicator.chooseTarget(filename,
1, dataNodes[2], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, dataNodes[2], chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[2], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, dataNodes[2], chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
@ -98,10 +99,10 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
FileSystem.setDefaultUri(CONF, "hdfs://localhost:0");
CONF.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
// Set properties to make HDFS aware of NodeGroup.
CONF.set("dfs.block.replicator.classname",
"org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyWithNodeGroup");
CONF.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
BlockPlacementPolicyWithNodeGroup.class.getName());
CONF.set(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
"org.apache.hadoop.net.NetworkTopologyWithNodeGroup");
NetworkTopologyWithNodeGroup.class.getName());
File baseDir = PathUtils.getTestDir(TestReplicationPolicyWithNodeGroup.class);
@ -153,6 +154,35 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
return true;
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas) {
return chooseTarget(numOfReplicas, dataNodes[0]);
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer) {
return chooseTarget(numOfReplicas, writer,
new ArrayList<DatanodeDescriptor>());
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes);
}
private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, List<DatanodeDescriptor> chosenNodes) {
return chooseTarget(numOfReplicas, writer, chosenNodes, null);
}
private DatanodeDescriptor[] chooseTarget(
int numOfReplicas,
DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes,
Map<Node, Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE);
}
/**
* In this testcase, client is dataNodes[0]. So the 1st replica should be
* placed on dataNodes[0], the 2nd replica should be placed on
@ -168,31 +198,26 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]);
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameNodeGroup(targets[1], targets[2]));
targets = replicator.chooseTarget(filename,
4, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]) ||
@ -230,7 +255,7 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
excludedNodes = new HashMap<Node, Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]);
targets = repl.chooseTarget(4, dataNodes[0], chosenNodes, false,
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]);
@ -247,7 +272,7 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]);
chosenNodes.add(dataNodes[2]);
targets = repl.chooseTarget(1, dataNodes[0], chosenNodes, true,
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE);
System.out.println("targets=" + Arrays.asList(targets));
assertEquals(2, targets.length);
@ -272,30 +297,25 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
(HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]);
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[1]);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
4, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(4);
assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[1]);
assertTrue(cluster.isNodeGroupAware());
@ -326,23 +346,19 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], dataNodes[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3);
assertEquals(targets.length, 3);
for(int i=0; i<3; i++) {
assertFalse(cluster.isOnSameRack(targets[i], dataNodes[0]));
@ -363,21 +379,17 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
public void testChooseTarget5() throws Exception {
setupDataNodeCapacity();
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0, NODE);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1, NODE);
assertEquals(targets.length, 1);
targets = replicator.chooseTarget(filename,
2, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2, NODE);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, NODE, new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3, NODE);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(targets[1], targets[2]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
@ -397,23 +409,19 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
chosenNodes.add(dataNodes[0]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename,
3, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(3, chosenNodes);
assertEquals(targets.length, 3);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
@ -434,17 +442,14 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
chosenNodes.add(dataNodes[1]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]) &&
cluster.isOnSameRack(dataNodes[0], targets[1]));
@ -464,30 +469,26 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
chosenNodes.add(dataNodes[3]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
0, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(0, chosenNodes);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename,
1, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[3], targets[0]));
targets = replicator.chooseTarget(filename,
1, dataNodes[3], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, dataNodes[3], chosenNodes);
assertEquals(targets.length, 1);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[3], targets[0]));
assertFalse(cluster.isOnSameRack(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[0], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[0], targets[0]));
assertFalse(cluster.isOnSameNodeGroup(dataNodes[0], targets[0]));
targets = replicator.chooseTarget(filename,
2, dataNodes[3], chosenNodes, BLOCK_SIZE);
targets = chooseTarget(2, dataNodes[3], chosenNodes);
assertEquals(targets.length, 2);
assertTrue(cluster.isOnSameRack(dataNodes[3], targets[0]));
}
@ -573,21 +574,17 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
}
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 0, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(0, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 0);
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(1, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 1);
targets = replicator.chooseTarget(filename, 2, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(2, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 2);
assertFalse(cluster.isOnSameRack(targets[0], targets[1]));
targets = replicator.chooseTarget(filename, 3, dataNodesInBoundaryCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3, dataNodesInBoundaryCase[0]);
assertEquals(targets.length, 3);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
}
@ -610,8 +607,7 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
chosenNodes.add(dataNodesInBoundaryCase[0]);
chosenNodes.add(dataNodesInBoundaryCase[5]);
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename, 1, dataNodesInBoundaryCase[0],
chosenNodes, BLOCK_SIZE);
targets = chooseTarget(1, dataNodesInBoundaryCase[0], chosenNodes);
assertFalse(cluster.isOnSameNodeGroup(targets[0],
dataNodesInBoundaryCase[0]));
assertFalse(cluster.isOnSameNodeGroup(targets[0],
@ -650,14 +646,12 @@ public class TestReplicationPolicyWithNodeGroup extends TestCase {
DatanodeDescriptor[] targets;
// Test normal case -- 3 replicas
targets = replicator.chooseTarget(filename, 3, dataNodesInMoreTargetsCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(3, dataNodesInMoreTargetsCase[0]);
assertEquals(targets.length, 3);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
// Test special case -- replica number over node groups.
targets = replicator.chooseTarget(filename, 10, dataNodesInMoreTargetsCase[0],
new ArrayList<DatanodeDescriptor>(), BLOCK_SIZE);
targets = chooseTarget(10, dataNodesInMoreTargetsCase[0]);
assertTrue(checkTargetsOnDifferentNodeGroup(targets));
// Verify it only can find 6 targets for placing replicas.
assertEquals(targets.length, 6);

View File

@ -96,8 +96,8 @@ public class TestDNFencing {
// Increase max streams so that we re-replicate quickly.
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 1000);
// See RandomDeleterPolicy javadoc.
conf.setClass("dfs.block.replicator.classname", RandomDeleterPolicy.class,
BlockPlacementPolicy.class);
conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
RandomDeleterPolicy.class, BlockPlacementPolicy.class);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())