HDFS-5207. In BlockPlacementPolicy.chooseTarget(..), change the writer and the excludedNodes parameter types respectively to Node and Set. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1523875 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-09-17 02:38:47 +00:00
parent 90f43b1c18
commit f98c343c7f
10 changed files with 121 additions and 105 deletions

View File

@ -279,6 +279,10 @@ Release 2.3.0 - UNRELEASED
methods; replace HashMap with Map in parameter declarations and cleanup methods; replace HashMap with Map in parameter declarations and cleanup
some related code. (szetszwo) some related code. (szetszwo)
HDFS-5207. In BlockPlacementPolicy.chooseTarget(..), change the writer
and the excludedNodes parameter types respectively to Node and Set.
(Junping Du via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
@ -1256,13 +1257,13 @@ public class BlockManager {
namesystem.writeUnlock(); namesystem.writeUnlock();
} }
final Map<Node, Node> excludedNodes = new HashMap<Node, Node>(); final Set<Node> excludedNodes = new HashSet<Node>();
for(ReplicationWork rw : work){ for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets. // Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes. // This list includes decommissioning or corrupt nodes.
excludedNodes.clear(); excludedNodes.clear();
for (DatanodeDescriptor dn : rw.containingNodes) { for (DatanodeDescriptor dn : rw.containingNodes) {
excludedNodes.put(dn, dn); excludedNodes.add(dn);
} }
// choose replication targets: NOT HOLDING THE GLOBAL LOCK // choose replication targets: NOT HOLDING THE GLOBAL LOCK
@ -1375,12 +1376,12 @@ public class BlockManager {
* *
* @throws IOException * @throws IOException
* if the number of targets < minimum replication. * if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
* List, boolean, HashMap, long) * List, boolean, Set, long)
*/ */
public DatanodeDescriptor[] chooseTarget(final String src, public DatanodeDescriptor[] chooseTarget(final String src,
final int numOfReplicas, final DatanodeDescriptor client, final int numOfReplicas, final DatanodeDescriptor client,
final HashMap<Node, Node> excludedNodes, final Set<Node> excludedNodes,
final long blocksize, List<String> favoredNodes) throws IOException { final long blocksize, List<String> favoredNodes) throws IOException {
List<DatanodeDescriptor> favoredDatanodeDescriptors = List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes); getDatanodeDescriptors(favoredNodes);
@ -3248,7 +3249,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} }
private void chooseTargets(BlockPlacementPolicy blockplacement, private void chooseTargets(BlockPlacementPolicy blockplacement,
Map<Node, Node> excludedNodes) { Set<Node> excludedNodes) {
targets = blockplacement.chooseTarget(bc.getName(), targets = blockplacement.chooseTarget(bc.getName(),
additionalReplRequired, srcNode, liveReplicaNodes, false, additionalReplRequired, srcNode, liveReplicaNodes, false,
excludedNodes, block.getNumBytes()); excludedNodes, block.getNumBytes());

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -68,22 +69,22 @@ public abstract class BlockPlacementPolicy {
*/ */
public abstract DatanodeDescriptor[] chooseTarget(String srcPath, public abstract DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas, int numOfReplicas,
DatanodeDescriptor writer, Node writer,
List<DatanodeDescriptor> chosenNodes, List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes, boolean returnChosenNodes,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize); long blocksize);
/** /**
* Same as {@link #chooseTarget(String, int, DatanodeDescriptor, List, boolean, * Same as {@link #chooseTarget(String, int, Node, List, boolean,
* HashMap, long)} with added parameter {@code favoredDatanodes} * Set, long)} with added parameter {@code favoredDatanodes}
* @param favoredNodes datanodes that should be favored as targets. This * @param favoredNodes datanodes that should be favored as targets. This
* is only a hint and due to cluster state, namenode may not be * is only a hint and due to cluster state, namenode may not be
* able to place the blocks on these datanodes. * able to place the blocks on these datanodes.
*/ */
DatanodeDescriptor[] chooseTarget(String src, DatanodeDescriptor[] chooseTarget(String src,
int numOfReplicas, DatanodeDescriptor writer, int numOfReplicas, Node writer,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, List<DatanodeDescriptor> favoredNodes) { long blocksize, List<DatanodeDescriptor> favoredNodes) {
// This class does not provide the functionality of placing // This class does not provide the functionality of placing
// a block in favored datanodes. The implementations of this class // a block in favored datanodes. The implementations of this class

View File

@ -21,9 +21,8 @@ import static org.apache.hadoop.util.Time.now;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
@ -106,10 +105,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override @Override
public DatanodeDescriptor[] chooseTarget(String srcPath, public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas, int numOfReplicas,
DatanodeDescriptor writer, Node writer,
List<DatanodeDescriptor> chosenNodes, List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes, boolean returnChosenNodes,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize) { long blocksize) {
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize); excludedNodes, blocksize);
@ -118,8 +117,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
@Override @Override
DatanodeDescriptor[] chooseTarget(String src, DatanodeDescriptor[] chooseTarget(String src,
int numOfReplicas, int numOfReplicas,
DatanodeDescriptor writer, Node writer,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
List<DatanodeDescriptor> favoredNodes) { List<DatanodeDescriptor> favoredNodes) {
try { try {
@ -130,8 +129,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
excludedNodes, blocksize); excludedNodes, blocksize);
} }
Map<Node, Node> favoriteAndExcludedNodes = excludedNodes == null ? Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashMap<Node, Node>() : new HashMap<Node, Node>(excludedNodes); new HashSet<Node>() : new HashSet<Node>(excludedNodes);
// Choose favored nodes // Choose favored nodes
List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>(); List<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
@ -150,10 +149,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+ " with favored node " + favoredNode); + " with favored node " + favoredNode);
continue; continue;
} }
favoriteAndExcludedNodes.put(target, target); favoriteAndExcludedNodes.add(target);
} }
if (results.size() < numOfReplicas) { if (results.size() < numOfReplicas) {
// Not enough favored nodes, choose other nodes. // Not enough favored nodes, choose other nodes.
numOfReplicas -= results.size(); numOfReplicas -= results.size();
DatanodeDescriptor[] remainingTargets = DatanodeDescriptor[] remainingTargets =
@ -175,17 +174,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/** This is the implementation. */ /** This is the implementation. */
private DatanodeDescriptor[] chooseTarget(int numOfReplicas, private DatanodeDescriptor[] chooseTarget(int numOfReplicas,
DatanodeDescriptor writer, Node writer,
List<DatanodeDescriptor> chosenNodes, List<DatanodeDescriptor> chosenNodes,
boolean returnChosenNodes, boolean returnChosenNodes,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize) { long blocksize) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeDescriptor.EMPTY_ARRAY; return DatanodeDescriptor.EMPTY_ARRAY;
} }
if (excludedNodes == null) { if (excludedNodes == null) {
excludedNodes = new HashMap<Node, Node>(); excludedNodes = new HashSet<Node>();
} }
int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas); int[] result = getMaxNodesPerRack(chosenNodes, numOfReplicas);
@ -200,12 +199,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
if (!clusterMap.contains(writer)) { if (!clusterMap.contains(writer)) {
writer=null; writer = null;
} }
boolean avoidStaleNodes = (stats != null boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite()); && stats.isAvoidingStaleDataNodesForWrite());
DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, Node localNode = chooseTarget(numOfReplicas, writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes); excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
if (!returnChosenNodes) { if (!returnChosenNodes) {
results.removeAll(chosenNodes); results.removeAll(chosenNodes);
@ -228,10 +227,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return new int[] {numOfReplicas, maxNodesPerRack}; return new int[] {numOfReplicas, maxNodesPerRack};
} }
/* choose <i>numOfReplicas</i> from all data nodes */ /**
private DatanodeDescriptor chooseTarget(int numOfReplicas, * choose <i>numOfReplicas</i> from all data nodes
DatanodeDescriptor writer, * @param numOfReplicas additional number of replicas wanted
Map<Node, Node> excludedNodes, * @param writer the writer's machine, could be a non-DatanodeDescriptor node
* @param excludedNodes datanodes that should not be considered as targets
* @param blocksize size of the data to be written
* @param maxNodesPerRack max nodes allowed per rack
* @param results the target nodes already chosen
* @param avoidStaleNodes avoid stale nodes in replica choosing
* @return local node of writer (not chosen node)
*/
private Node chooseTarget(int numOfReplicas,
Node writer,
Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -243,13 +252,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int numOfResults = results.size(); int numOfResults = results.size();
boolean newBlock = (numOfResults==0); boolean newBlock = (numOfResults==0);
if (writer == null && !newBlock) { if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0); writer = results.get(0);
} }
// Keep a copy of original excludedNodes // Keep a copy of original excludedNodes
final Map<Node, Node> oldExcludedNodes = avoidStaleNodes ? final Set<Node> oldExcludedNodes = avoidStaleNodes ?
new HashMap<Node, Node>(excludedNodes) : null; new HashSet<Node>(excludedNodes) : null;
try { try {
if (numOfResults == 0) { if (numOfResults == 0) {
writer = chooseLocalNode(writer, excludedNodes, blocksize, writer = chooseLocalNode(writer, excludedNodes, blocksize,
@ -296,7 +305,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// We need to additionally exclude the nodes that were added to the // We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above. // result list in the successful calls to choose*() above.
for (Node node : results) { for (Node node : results) {
oldExcludedNodes.put(node, node); oldExcludedNodes.add(node);
} }
// Set numOfReplicas, since it can get out of sync with the result list // Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom(). // if the NotEnoughReplicasException was thrown in chooseRandom().
@ -314,8 +323,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* choose a node on the same rack * choose a node on the same rack
* @return the chosen node * @return the chosen node
*/ */
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, protected DatanodeDescriptor chooseLocalNode(Node localMachine,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -325,13 +334,13 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (localMachine == null) if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes); maxNodesPerRack, results, avoidStaleNodes);
if (preferLocalNode) { if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first // otherwise try local machine first
Node oldNode = excludedNodes.put(localMachine, localMachine); if (excludedNodes.add(localMachine)) { // was not in the excluded list
if (oldNode == null) { // was not in the excluded list if (addIfIsGoodTarget(localDatanode, excludedNodes, blocksize,
if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localMachine; return localDatanode;
} }
} }
} }
@ -347,9 +356,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return number of new excluded nodes * @return number of new excluded nodes
*/ */
protected int addToExcludedNodes(DatanodeDescriptor localMachine, protected int addToExcludedNodes(DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes) { Set<Node> excludedNodes) {
Node node = excludedNodes.put(localMachine, localMachine); return excludedNodes.add(localMachine) ? 1 : 0;
return node == null?1:0;
} }
/** /**
@ -360,8 +368,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* in the cluster. * in the cluster.
* @return the chosen node * @return the chosen node
*/ */
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, protected DatanodeDescriptor chooseLocalRack(Node localMachine,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -412,7 +420,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
protected void chooseRemoteRack(int numOfReplicas, protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, DatanodeDescriptor localMachine,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxReplicasPerRack, int maxReplicasPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -436,7 +444,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return the chosen node, if there is any. * @return the chosen node, if there is any.
*/ */
protected DatanodeDescriptor chooseRandom(String scope, protected DatanodeDescriptor chooseRandom(String scope,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -452,7 +460,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
*/ */
protected DatanodeDescriptor chooseRandom(int numOfReplicas, protected DatanodeDescriptor chooseRandom(int numOfReplicas,
String scope, String scope,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
@ -460,7 +468,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
scope, excludedNodes.keySet()); scope, excludedNodes);
StringBuilder builder = null; StringBuilder builder = null;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
builder = debugLoggingBuilder.get(); builder = debugLoggingBuilder.get();
@ -472,8 +480,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
while(numOfReplicas > 0 && numOfAvailableNodes > 0) { while(numOfReplicas > 0 && numOfAvailableNodes > 0) {
DatanodeDescriptor chosenNode = DatanodeDescriptor chosenNode =
(DatanodeDescriptor)clusterMap.chooseRandom(scope); (DatanodeDescriptor)clusterMap.chooseRandom(scope);
Node oldNode = excludedNodes.put(chosenNode, chosenNode); if (excludedNodes.add(chosenNode)) { //was not in the excluded list
if (oldNode == null) {
numOfAvailableNodes--; numOfAvailableNodes--;
int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes, int newExcludedNodes = addIfIsGoodTarget(chosenNode, excludedNodes,
@ -506,16 +513,16 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/** /**
* If the given node is a good target, add it to the result list and * If the given node is a good target, add it to the result list and
* update the excluded node map. * update the set of excluded nodes.
* @return -1 if the given is not a good target; * @return -1 if the given is not a good target;
* otherwise, return the number of excluded nodes added to the map. * otherwise, return the number of nodes added to excludedNodes set.
*/ */
int addIfIsGoodTarget(DatanodeDescriptor node, int addIfIsGoodTarget(DatanodeDescriptor node,
Map<Node, Node> excludedNodes, Set<Node> excludedNodes,
long blockSize, long blockSize,
int maxNodesPerRack, int maxNodesPerRack,
boolean considerLoad, boolean considerLoad,
List<DatanodeDescriptor> results, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) { boolean avoidStaleNodes) {
if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad, if (isGoodTarget(node, blockSize, maxNodesPerRack, considerLoad,
results, avoidStaleNodes)) { results, avoidStaleNodes)) {
@ -614,7 +621,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* starts from the writer and traverses all <i>nodes</i> * starts from the writer and traverses all <i>nodes</i>
* This is basically a traveling salesman problem. * This is basically a traveling salesman problem.
*/ */
private DatanodeDescriptor[] getPipeline(DatanodeDescriptor writer, private DatanodeDescriptor[] getPipeline(Node writer,
DatanodeDescriptor[] nodes) { DatanodeDescriptor[] nodes) {
if (nodes.length==0) return nodes; if (nodes.length==0) return nodes;

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -63,8 +64,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* @return the chosen node * @return the chosen node
*/ */
@Override @Override
protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine, protected DatanodeDescriptor chooseLocalNode(Node localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes) List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
@ -72,14 +73,16 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
return chooseRandom(NodeBase.ROOT, excludedNodes, return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes); blocksize, maxNodesPerRack, results, avoidStaleNodes);
// otherwise try local machine first if (localMachine instanceof DatanodeDescriptor) {
Node oldNode = excludedNodes.put(localMachine, localMachine); DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
if (oldNode == null) { // was not in the excluded list // otherwise try local machine first
if (addIfIsGoodTarget(localMachine, excludedNodes, blocksize, if (excludedNodes.add(localMachine)) { // was not in the excluded list
maxNodesPerRack, false, results, avoidStaleNodes) >= 0) { if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
return localMachine; maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
return localDataNode;
}
} }
} }
// try a node on local node group // try a node on local node group
DatanodeDescriptor chosenNode = chooseLocalNodeGroup( DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
@ -95,8 +98,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
@Override @Override
protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine, protected DatanodeDescriptor chooseLocalRack(Node localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes) List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// no local machine, so choose a random machine // no local machine, so choose a random machine
@ -142,7 +145,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
@Override @Override
protected void chooseRemoteRack(int numOfReplicas, protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, Map<Node, Node> excludedNodes, DatanodeDescriptor localMachine, Set<Node> excludedNodes,
long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results, long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
boolean avoidStaleNodes) throws NotEnoughReplicasException { boolean avoidStaleNodes) throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size(); int oldNumOfReplicas = results.size();
@ -168,8 +171,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
* @return the chosen node * @return the chosen node
*/ */
private DatanodeDescriptor chooseLocalNodeGroup( private DatanodeDescriptor chooseLocalNodeGroup(
NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine, NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
Map<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeDescriptor> results, boolean avoidStaleNodes) List<DatanodeDescriptor> results, boolean avoidStaleNodes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// no local machine, so choose a random machine // no local machine, so choose a random machine
@ -225,13 +228,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
*/ */
@Override @Override
protected int addToExcludedNodes(DatanodeDescriptor chosenNode, protected int addToExcludedNodes(DatanodeDescriptor chosenNode,
Map<Node, Node> excludedNodes) { Set<Node> excludedNodes) {
int countOfExcludedNodes = 0; int countOfExcludedNodes = 0;
String nodeGroupScope = chosenNode.getNetworkLocation(); String nodeGroupScope = chosenNode.getNetworkLocation();
List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope); List<Node> leafNodes = clusterMap.getLeaves(nodeGroupScope);
for (Node leafNode : leafNodes) { for (Node leafNode : leafNodes) {
Node node = excludedNodes.put(leafNode, leafNode); if (excludedNodes.add(leafNode)) {
if (node == null) {
// not a existing node in excludedNodes // not a existing node in excludedNodes
countOfExcludedNodes++; countOfExcludedNodes++;
} }

View File

@ -223,7 +223,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.DelegationKey; import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -2443,7 +2442,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* client to "try again later". * client to "try again later".
*/ */
LocatedBlock getAdditionalBlock(String src, long fileId, String clientName, LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
ExtendedBlock previous, HashMap<Node, Node> excludedNodes, ExtendedBlock previous, Set<Node> excludedNodes,
List<String> favoredNodes) List<String> favoredNodes)
throws LeaseExpiredException, NotReplicatedYetException, throws LeaseExpiredException, NotReplicatedYetException,
QuotaExceededException, SafeModeException, UnresolvedLinkException, QuotaExceededException, SafeModeException, UnresolvedLinkException,
@ -2642,7 +2641,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */ /** @see NameNode#getAdditionalDatanode(String, ExtendedBlock, DatanodeInfo[], DatanodeInfo[], int, String) */
LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk, LocatedBlock getAdditionalDatanode(String src, final ExtendedBlock blk,
final DatanodeInfo[] existings, final HashMap<Node, Node> excludes, final DatanodeInfo[] existings, final Set<Node> excludes,
final int numAdditionalNodes, final String clientName final int numAdditionalNodes, final String clientName
) throws IOException { ) throws IOException {
//check if the feature is enabled //check if the feature is enabled

View File

@ -29,8 +29,9 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -547,11 +548,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
+ " fileId=" + fileId + " for " + clientName); + " fileId=" + fileId + " for " + clientName);
} }
HashMap<Node, Node> excludedNodesSet = null; Set<Node> excludedNodesSet = null;
if (excludedNodes != null) { if (excludedNodes != null) {
excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length); excludedNodesSet = new HashSet<Node>(excludedNodes.length);
for (Node node : excludedNodes) { for (Node node : excludedNodes) {
excludedNodesSet.put(node, node); excludedNodesSet.add(node);
} }
} }
List<String> favoredNodesList = (favoredNodes == null) ? null List<String> favoredNodesList = (favoredNodes == null) ? null
@ -579,11 +580,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
metrics.incrGetAdditionalDatanodeOps(); metrics.incrGetAdditionalDatanodeOps();
HashMap<Node, Node> excludeSet = null; Set<Node> excludeSet = null;
if (excludes != null) { if (excludes != null) {
excludeSet = new HashMap<Node, Node>(excludes.length); excludeSet = new HashSet<Node>(excludes.length);
for (Node node : excludes) { for (Node node : excludes) {
excludeSet.put(node, node); excludeSet.add(node);
} }
} }
return namesystem.getAdditionalDatanode(src, blk, return namesystem.getAdditionalDatanode(src, blk,

View File

@ -29,9 +29,11 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -187,7 +189,7 @@ public class TestReplicationPolicy {
} }
private static DatanodeDescriptor[] chooseTarget(int numOfReplicas, private static DatanodeDescriptor[] chooseTarget(int numOfReplicas,
List<DatanodeDescriptor> chosenNodes, Map<Node, Node> excludedNodes) { List<DatanodeDescriptor> chosenNodes, Set<Node> excludedNodes) {
return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes); return chooseTarget(numOfReplicas, dataNodes[0], chosenNodes, excludedNodes);
} }
@ -195,7 +197,7 @@ public class TestReplicationPolicy {
int numOfReplicas, int numOfReplicas,
DatanodeDescriptor writer, DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes, List<DatanodeDescriptor> chosenNodes,
Map<Node, Node> excludedNodes) { Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE); false, excludedNodes, BLOCK_SIZE);
} }
@ -210,25 +212,25 @@ public class TestReplicationPolicy {
*/ */
@Test @Test
public void testChooseTarget2() throws Exception { public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes; Set<Node> excludedNodes;
DatanodeDescriptor[] targets; DatanodeDescriptor[] targets;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>(); List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
excludedNodes = new HashMap<Node, Node>(); excludedNodes = new HashSet<Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = chooseTarget(0, chosenNodes, excludedNodes); targets = chooseTarget(0, chosenNodes, excludedNodes);
assertEquals(targets.length, 0); assertEquals(targets.length, 0);
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = chooseTarget(1, chosenNodes, excludedNodes); targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1); assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[0]); assertEquals(targets[0], dataNodes[0]);
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = chooseTarget(2, chosenNodes, excludedNodes); targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(targets.length, 2); assertEquals(targets.length, 2);
assertEquals(targets[0], dataNodes[0]); assertEquals(targets[0], dataNodes[0]);
@ -236,7 +238,7 @@ public class TestReplicationPolicy {
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = chooseTarget(3, chosenNodes, excludedNodes); targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(targets.length, 3); assertEquals(targets.length, 3);
assertEquals(targets[0], dataNodes[0]); assertEquals(targets[0], dataNodes[0]);
@ -245,7 +247,7 @@ public class TestReplicationPolicy {
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = chooseTarget(4, chosenNodes, excludedNodes); targets = chooseTarget(4, chosenNodes, excludedNodes);
assertEquals(targets.length, 4); assertEquals(targets.length, 4);
assertEquals(targets[0], dataNodes[0]); assertEquals(targets[0], dataNodes[0]);
@ -258,7 +260,7 @@ public class TestReplicationPolicy {
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
chosenNodes.add(dataNodes[2]); chosenNodes.add(dataNodes[2]);
targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, targets = replicator.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE); excludedNodes, BLOCK_SIZE);
@ -479,8 +481,8 @@ public class TestReplicationPolicy {
assertEquals(targets.length, 1); assertEquals(targets.length, 1);
assertEquals(targets[0], dataNodes[1]); assertEquals(targets[0], dataNodes[1]);
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>(); Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>(); List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
targets = chooseTarget(1, chosenNodes, excludedNodes); targets = chooseTarget(1, chosenNodes, excludedNodes);
assertEquals(targets.length, 1); assertEquals(targets.length, 1);

View File

@ -181,7 +181,7 @@ public class TestReplicationPolicyWithNodeGroup {
int numOfReplicas, int numOfReplicas,
DatanodeDescriptor writer, DatanodeDescriptor writer,
List<DatanodeDescriptor> chosenNodes, List<DatanodeDescriptor> chosenNodes,
Map<Node, Node> excludedNodes) { Set<Node> excludedNodes) {
return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes, return replicator.chooseTarget(filename, numOfReplicas, writer, chosenNodes,
false, excludedNodes, BLOCK_SIZE); false, excludedNodes, BLOCK_SIZE);
} }
@ -252,14 +252,13 @@ public class TestReplicationPolicyWithNodeGroup {
* @throws Exception * @throws Exception
*/ */
@Test @Test
public void testChooseTarget2() throws Exception { public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets; DatanodeDescriptor[] targets;
BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator; BlockPlacementPolicyDefault repl = (BlockPlacementPolicyDefault)replicator;
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>(); List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
excludedNodes = new HashMap<Node, Node>(); Set<Node> excludedNodes = new HashSet<Node>();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false, targets = repl.chooseTarget(filename, 4, dataNodes[0], chosenNodes, false,
excludedNodes, BLOCK_SIZE); excludedNodes, BLOCK_SIZE);
assertEquals(targets.length, 4); assertEquals(targets.length, 4);
@ -275,7 +274,7 @@ public class TestReplicationPolicyWithNodeGroup {
excludedNodes.clear(); excludedNodes.clear();
chosenNodes.clear(); chosenNodes.clear();
excludedNodes.put(dataNodes[1], dataNodes[1]); excludedNodes.add(dataNodes[1]);
chosenNodes.add(dataNodes[2]); chosenNodes.add(dataNodes[2]);
targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true, targets = repl.chooseTarget(filename, 1, dataNodes[0], chosenNodes, true,
excludedNodes, BLOCK_SIZE); excludedNodes, BLOCK_SIZE);

View File

@ -25,7 +25,7 @@ import static org.mockito.Mockito.spy;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashSet;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -119,7 +119,7 @@ public class TestAddBlockRetry {
return ret; return ret;
} }
}).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(), }).when(spyBM).chooseTarget(Mockito.anyString(), Mockito.anyInt(),
Mockito.<DatanodeDescriptor>any(), Mockito.<HashMap<Node, Node>>any(), Mockito.<DatanodeDescriptor>any(), Mockito.<HashSet<Node>>any(),
Mockito.anyLong(), Mockito.<List<String>>any()); Mockito.anyLong(), Mockito.<List<String>>any());
// create file // create file