HDFS-6961. Archival Storage: BlockPlacementPolicy#chooseTarget should check each valid storage type in each choosing round.

This commit is contained in:
Jing Zhao 2014-09-04 14:19:32 -07:00
parent 45d5b13256
commit e08701ec71
6 changed files with 240 additions and 127 deletions

View File

@ -23,6 +23,7 @@ import java.util.EnumSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -99,7 +100,8 @@ public class BlockStoragePolicy {
/** The fallback storage type for replication. */ /** The fallback storage type for replication. */
private final StorageType[] replicationFallbacks; private final StorageType[] replicationFallbacks;
BlockStoragePolicy(byte id, String name, StorageType[] storageTypes, @VisibleForTesting
public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
StorageType[] creationFallbacks, StorageType[] replicationFallbacks) { StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
this.id = id; this.id = id;
this.name = name; this.name = name;

View File

@ -453,7 +453,7 @@ public class BlockManager {
} }
@VisibleForTesting @VisibleForTesting
BlockPlacementPolicy getBlockPlacementPolicy() { public BlockPlacementPolicy getBlockPlacementPolicy() {
return blockplacement; return blockplacement;
} }

View File

@ -19,13 +19,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.now; import static org.apache.hadoop.util.Time.now;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -142,8 +136,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ? Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
new HashSet<Node>() : new HashSet<Node>(excludedNodes); new HashSet<Node>() : new HashSet<Node>(excludedNodes);
final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( final List<StorageType> requiredStorageTypes = storagePolicy
(short)numOfReplicas); .chooseStorageTypes((short)numOfReplicas);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
// Choose favored nodes // Choose favored nodes
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
@ -156,13 +152,12 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize, favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results.size(), numOfReplicas)[1], getMaxNodesPerRack(results.size(), numOfReplicas)[1],
results, avoidStaleNodes, storageTypes.get(0), false); results, avoidStaleNodes, storageTypes, false);
if (target == null) { if (target == null) {
LOG.warn("Could not find a target for file " + src LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode); + " with favored node " + favoredNode);
continue; continue;
} }
storageTypes.remove(0);
favoriteAndExcludedNodes.add(target.getDatanodeDescriptor()); favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
} }
@ -241,6 +236,21 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return new int[] {numOfReplicas, maxNodesPerRack}; return new int[] {numOfReplicas, maxNodesPerRack};
} }
private EnumMap<StorageType, Integer> getRequiredStorageTypes(
List<StorageType> types) {
EnumMap<StorageType, Integer> map = new EnumMap<StorageType,
Integer>(StorageType.class);
for (StorageType type : types) {
if (!map.containsKey(type)) {
map.put(type, 1);
} else {
int num = map.get(type);
map.put(type, num + 1);
}
}
return map;
}
/** /**
* choose <i>numOfReplicas</i> from all data nodes * choose <i>numOfReplicas</i> from all data nodes
* @param numOfReplicas additional number of replicas wanted * @param numOfReplicas additional number of replicas wanted
@ -272,17 +282,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
// Keep a copy of original excludedNodes // Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = avoidStaleNodes ? final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
new HashSet<Node>(excludedNodes) : null;
// choose storage types; use fallbacks for unavailable storages // choose storage types; use fallbacks for unavailable storages
final List<StorageType> storageTypes = storagePolicy.chooseStorageTypes( final List<StorageType> requiredStorageTypes = storagePolicy
(short)totalReplicasExpected, DatanodeStorageInfo.toStorageTypes(results), .chooseStorageTypes((short) totalReplicasExpected,
unavailableStorages, newBlock); DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock);
final EnumMap<StorageType, Integer> storageTypes =
getRequiredStorageTypes(requiredStorageTypes);
StorageType curStorageType = null;
try { try {
if ((numOfReplicas = storageTypes.size()) == 0) { if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException( throw new NotEnoughReplicasException(
"All required storage types are unavailable: " "All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages + " unavailableStorages=" + unavailableStorages
@ -290,9 +301,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
if (numOfResults == 0) { if (numOfResults == 0) {
curStorageType = storageTypes.remove(0);
writer = chooseLocalStorage(writer, excludedNodes, blocksize, writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, curStorageType, true) maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
.getDatanodeDescriptor(); .getDatanodeDescriptor();
if (--numOfReplicas == 0) { if (--numOfReplicas == 0) {
return writer; return writer;
@ -300,33 +310,30 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) { if (numOfResults <= 1) {
curStorageType = storageTypes.remove(0);
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, curStorageType); results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) { if (--numOfReplicas == 0) {
return writer; return writer;
} }
} }
if (numOfResults <= 2) { if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
curStorageType = storageTypes.remove(0);
if (clusterMap.isOnSameRack(dn0, dn1)) { if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, curStorageType); results, avoidStaleNodes, storageTypes);
} else if (newBlock){ } else if (newBlock){
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, curStorageType); results, avoidStaleNodes, storageTypes);
} else { } else {
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, curStorageType); results, avoidStaleNodes, storageTypes);
} }
if (--numOfReplicas == 0) { if (--numOfReplicas == 0) {
return writer; return writer;
} }
} }
curStorageType = storageTypes.remove(0);
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, curStorageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) { } catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of " final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach " + (totalReplicasExpected - results.size()) + " to reach "
@ -355,10 +362,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
newBlock); newBlock);
} }
if (storageTypes.size() > 0) { boolean retry = false;
// Retry chooseTarget with fallback storage types // simply add all the remaining types into unavailableStorages and give
unavailableStorages.add(curStorageType); // another try. No best effort is guaranteed here.
return chooseTarget(numOfReplicas, writer, excludedNodes, blocksize, for (StorageType type : storageTypes.keySet()) {
if (!unavailableStorages.contains(type)) {
unavailableStorages.add(type);
retry = true;
}
}
if (retry) {
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
oldExcludedNodes);
}
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages, maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock); newBlock);
} }
@ -373,28 +392,35 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return the chosen storage * @return the chosen storage
*/ */
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
long blocksize, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
int maxNodesPerRack, EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType,
boolean fallbackToLocalRack)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
if (localMachine == null) { if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first // otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list if (excludedNodes.add(localMachine)) { // was not in the excluded list
for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
localDatanode.getStorageInfos())) { .entrySet().iterator(); iter.hasNext(); ) {
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, Map.Entry<StorageType, Integer> entry = iter.next();
maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
return localStorage; localDatanode.getStorageInfos())) {
StorageType type = entry.getKey();
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
int num = entry.getValue();
if (num == 1) {
iter.remove();
} else {
entry.setValue(num - 1);
}
return localStorage;
}
} }
} }
} }
@ -405,7 +431,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
// try a node on local rack // try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize, return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
/** /**
@ -428,23 +454,23 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @return the chosen node * @return the chosen node
*/ */
protected DatanodeStorageInfo chooseLocalRack(Node localMachine, protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes, Set<Node> excludedNodes,
long blocksize, long blocksize,
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, boolean avoidStaleNodes,
StorageType storageType) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// no local machine, so choose a random machine // no local machine, so choose a random machine
if (localMachine == null) { if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
// choose one from the local rack // choose one from the local rack
try { try {
return chooseRandom(localMachine.getNetworkLocation(), excludedNodes, return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e1) { } catch (NotEnoughReplicasException e1) {
// find the second replica // find the second replica
DatanodeDescriptor newLocal=null; DatanodeDescriptor newLocal=null;
@ -458,16 +484,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (newLocal != null) { if (newLocal != null) {
try { try {
return chooseRandom(newLocal.getNetworkLocation(), excludedNodes, return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes);
} catch(NotEnoughReplicasException e2) { } catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} else { } else {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} }
} }
@ -486,18 +513,18 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int maxReplicasPerRack, int maxReplicasPerRack,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, boolean avoidStaleNodes,
StorageType storageType) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size(); int oldNumOfReplicas = results.size();
// randomly choose one node from remote racks // randomly choose one node from remote racks
try { try {
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
excludedNodes, blocksize, maxReplicasPerRack, results, excludedNodes, blocksize, maxReplicasPerRack, results,
avoidStaleNodes, storageType); avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) { } catch (NotEnoughReplicasException e) {
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
localMachine.getNetworkLocation(), excludedNodes, blocksize, localMachine.getNetworkLocation(), excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes, storageType); maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
} }
} }
@ -511,10 +538,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, boolean avoidStaleNodes,
StorageType storageType) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType); results, avoidStaleNodes, storageTypes);
} }
/** /**
@ -528,8 +555,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, boolean avoidStaleNodes,
StorageType storageType) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
scope, excludedNodes); scope, excludedNodes);
@ -549,18 +576,31 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
final DatanodeStorageInfo[] storages = DFSUtil.shuffle( final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
chosenNode.getStorageInfos()); chosenNode.getStorageInfos());
int i; int i = 0;
for(i = 0; i < storages.length; i++) { boolean search = true;
final int newExcludedNodes = addIfIsGoodTarget(storages[i], for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, .entrySet().iterator(); search && iter.hasNext(); ) {
avoidStaleNodes, storageType); Map.Entry<StorageType, Integer> entry = iter.next();
if (newExcludedNodes >= 0) { for (i = 0; i < storages.length; i++) {
numOfReplicas--; StorageType type = entry.getKey();
if (firstChosen == null) { final int newExcludedNodes = addIfIsGoodTarget(storages[i],
firstChosen = storages[i]; excludedNodes, blocksize, maxNodesPerRack, considerLoad, results,
avoidStaleNodes, type);
if (newExcludedNodes >= 0) {
numOfReplicas--;
if (firstChosen == null) {
firstChosen = storages[i];
}
numOfAvailableNodes -= newExcludedNodes;
int num = entry.getValue();
if (num == 1) {
iter.remove();
} else {
entry.setValue(num - 1);
}
search = false;
break;
} }
numOfAvailableNodes -= newExcludedNodes;
break;
} }
} }

View File

@ -17,12 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.*;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -70,22 +65,33 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType, boolean fallbackToLocalRack EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
) throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
if (localMachine == null) if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, return chooseRandom(NodeBase.ROOT, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
// otherwise try local machine first // otherwise try local machine first
if (localMachine instanceof DatanodeDescriptor) { if (localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine; DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
if (excludedNodes.add(localMachine)) { // was not in the excluded list if (excludedNodes.add(localMachine)) { // was not in the excluded list
for(DatanodeStorageInfo localStorage : DFSUtil.shuffle( for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
localDataNode.getStorageInfos())) { .entrySet().iterator(); iter.hasNext(); ) {
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, Map.Entry<StorageType, Integer> entry = iter.next();
maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) { for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
return localStorage; localDataNode.getStorageInfos())) {
StorageType type = entry.getKey();
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
int num = entry.getValue();
if (num == 1) {
iter.remove();
} else {
entry.setValue(num - 1);
}
return localStorage;
}
} }
} }
} }
@ -94,7 +100,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
// try a node on local node group // try a node on local node group
DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup( DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
(NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
if (chosenStorage != null) { if (chosenStorage != null) {
return chosenStorage; return chosenStorage;
} }
@ -104,7 +110,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
} }
// try a node on local rack // try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
/** @return the node of the second replica */ /** @return the node of the second replica */
@ -124,18 +130,19 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
protected DatanodeStorageInfo chooseLocalRack(Node localMachine, protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException { EnumMap<StorageType, Integer> storageTypes) throws
NotEnoughReplicasException {
// no local machine, so choose a random machine // no local machine, so choose a random machine
if (localMachine == null) { if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
// choose one from the local rack, but off-nodegroup // choose one from the local rack, but off-nodegroup
try { try {
final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()); final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack, return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageType); results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e1) { } catch (NotEnoughReplicasException e1) {
// find the second replica // find the second replica
final DatanodeDescriptor newLocal = secondNode(localMachine, results); final DatanodeDescriptor newLocal = secondNode(localMachine, results);
@ -143,16 +150,17 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
try { try {
return chooseRandom( return chooseRandom(
clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes, clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes);
} catch(NotEnoughReplicasException e2) { } catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} else { } else {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} }
} }
@ -161,8 +169,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
protected void chooseRemoteRack(int numOfReplicas, protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, Set<Node> excludedNodes, DatanodeDescriptor localMachine, Set<Node> excludedNodes,
long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results, long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, StorageType storageType) boolean avoidStaleNodes, EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int oldNumOfReplicas = results.size(); int oldNumOfReplicas = results.size();
final String rackLocation = NetworkTopology.getFirstHalf( final String rackLocation = NetworkTopology.getFirstHalf(
@ -170,12 +178,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
try { try {
// randomly choose from remote racks // randomly choose from remote racks
chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize, chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes, storageType); maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
} catch (NotEnoughReplicasException e) { } catch (NotEnoughReplicasException e) {
// fall back to the local rack // fall back to the local rack
chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas), chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
rackLocation, excludedNodes, blocksize, rackLocation, excludedNodes, blocksize,
maxReplicasPerRack, results, avoidStaleNodes, storageType); maxReplicasPerRack, results, avoidStaleNodes, storageTypes);
} }
} }
@ -189,11 +197,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
NetworkTopologyWithNodeGroup clusterMap, Node localMachine, NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException { EnumMap<StorageType, Integer> storageTypes) throws
NotEnoughReplicasException {
// no local machine, so choose a random machine // no local machine, so choose a random machine
if (localMachine == null) { if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
// choose one from the local node group // choose one from the local node group
@ -201,7 +210,7 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
return chooseRandom( return chooseRandom(
clusterMap.getNodeGroup(localMachine.getNetworkLocation()), clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageType); storageTypes);
} catch (NotEnoughReplicasException e1) { } catch (NotEnoughReplicasException e1) {
final DatanodeDescriptor newLocal = secondNode(localMachine, results); final DatanodeDescriptor newLocal = secondNode(localMachine, results);
if (newLocal != null) { if (newLocal != null) {
@ -209,16 +218,16 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
return chooseRandom( return chooseRandom(
clusterMap.getNodeGroup(newLocal.getNetworkLocation()), clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
excludedNodes, blocksize, maxNodesPerRack, results, excludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storageType); avoidStaleNodes, storageTypes);
} catch(NotEnoughReplicasException e2) { } catch(NotEnoughReplicasException e2) {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} else { } else {
//otherwise randomly choose one from the network //otherwise randomly choose one from the network
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageTypes);
} }
} }
} }

View File

@ -951,9 +951,14 @@ public class DFSTestUtil {
public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) { public static DatanodeStorageInfo[] createDatanodeStorageInfos(int n) {
return createDatanodeStorageInfos(n, null, null); return createDatanodeStorageInfos(n, null, null);
} }
public static DatanodeStorageInfo[] createDatanodeStorageInfos( public static DatanodeStorageInfo[] createDatanodeStorageInfos(
int n, String[] racks, String[] hostnames) { int n, String[] racks, String[] hostnames) {
return createDatanodeStorageInfos(n, racks, hostnames, null);
}
public static DatanodeStorageInfo[] createDatanodeStorageInfos(
int n, String[] racks, String[] hostnames, StorageType[] types) {
DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n]; DatanodeStorageInfo[] storages = new DatanodeStorageInfo[n];
for(int i = storages.length; i > 0; ) { for(int i = storages.length; i > 0; ) {
final String storageID = "s" + i; final String storageID = "s" + i;
@ -961,16 +966,30 @@ public class DFSTestUtil {
i--; i--;
final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack"; final String rack = (racks!=null && i < racks.length)? racks[i]: "defaultRack";
final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host"; final String hostname = (hostnames!=null && i < hostnames.length)? hostnames[i]: "host";
storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname); final StorageType type = (types != null && i < types.length) ? types[i]
: StorageType.DEFAULT;
storages[i] = createDatanodeStorageInfo(storageID, ip, rack, hostname,
type);
} }
return storages; return storages;
} }
public static DatanodeStorageInfo createDatanodeStorageInfo( public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack, String hostname) { String storageID, String ip, String rack, String hostname) {
final DatanodeStorage storage = new DatanodeStorage(storageID); return createDatanodeStorageInfo(storageID, ip, rack, hostname,
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage, hostname); StorageType.DEFAULT);
}
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack, String hostname,
StorageType type) {
final DatanodeStorage storage = new DatanodeStorage(storageID,
DatanodeStorage.State.NORMAL, type);
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(
ip, rack, storage, hostname);
return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage); return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
} }
public static DatanodeDescriptor[] toDatanodeDescriptor( public static DatanodeDescriptor[] toDatanodeDescriptor(
DatanodeStorageInfo[] storages) { DatanodeStorageInfo[] storages) {
DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length]; DatanodeDescriptor[] datanodes = new DatanodeDescriptor[storages.length];

View File

@ -19,23 +19,26 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED; import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.util.Arrays; import java.util.*;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.*; import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.blockmanagement.*;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper; import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -838,9 +841,7 @@ public class TestBlockStoragePolicy {
checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold checkDirectoryListing(dirList, WARM, COLD); // bar is warm, foo is cold
checkDirectoryListing(barList, WARM, HOT); checkDirectoryListing(barList, WARM, HOT);
} finally { } finally {
if (cluster != null) { cluster.shutdown();
cluster.shutdown();
}
} }
} }
@ -920,9 +921,7 @@ public class TestBlockStoragePolicy {
checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(), checkDirectoryListing(fs.getClient().listPaths(s1foo.toString(),
HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT); HdfsFileStatus.EMPTY_NAME).getPartialListing(), COLD, HOT);
} finally { } finally {
if (cluster != null) { cluster.shutdown();
cluster.shutdown();
}
} }
} }
@ -937,9 +936,7 @@ public class TestBlockStoragePolicy {
private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum, private void checkLocatedBlocks(HdfsLocatedFileStatus status, int blockNum,
int replicaNum, StorageType... types) { int replicaNum, StorageType... types) {
List<StorageType> typeList = Lists.newArrayList(); List<StorageType> typeList = Lists.newArrayList();
for (StorageType type : types) { Collections.addAll(typeList, types);
typeList.add(type);
}
LocatedBlocks lbs = status.getBlockLocations(); LocatedBlocks lbs = status.getBlockLocations();
Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size()); Assert.assertEquals(blockNum, lbs.getLocatedBlocks().size());
for (LocatedBlock lb : lbs.getLocatedBlocks()) { for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@ -1029,4 +1026,50 @@ public class TestBlockStoragePolicy {
new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE, new StorageType[]{StorageType.ARCHIVE, StorageType.ARCHIVE,
StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE}); StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE});
} }
@Test
public void testChooseTargetWithTopology() throws Exception {
BlockStoragePolicy policy1 = new BlockStoragePolicy((byte) 9, "TEST1",
new StorageType[]{StorageType.SSD, StorageType.DISK,
StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
BlockStoragePolicy policy2 = new BlockStoragePolicy((byte) 11, "TEST2",
new StorageType[]{StorageType.DISK, StorageType.SSD,
StorageType.ARCHIVE}, new StorageType[]{}, new StorageType[]{});
final String[] racks = {"/d1/r1", "/d1/r2", "/d1/r2"};
final String[] hosts = {"host1", "host2", "host3"};
final StorageType[] types = {StorageType.DISK, StorageType.SSD,
StorageType.ARCHIVE};
final DatanodeStorageInfo[] storages = DFSTestUtil
.createDatanodeStorageInfos(3, racks, hosts, types);
final DatanodeDescriptor[] dataNodes = DFSTestUtil
.toDatanodeDescriptor(storages);
FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
File baseDir = PathUtils.getTestDir(TestReplicationPolicy.class);
conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
new File(baseDir, "name").getPath());
DFSTestUtil.formatNameNode(conf);
NameNode namenode = new NameNode(conf);
final BlockManager bm = namenode.getNamesystem().getBlockManager();
BlockPlacementPolicy replicator = bm.getBlockPlacementPolicy();
NetworkTopology cluster = bm.getDatanodeManager().getNetworkTopology();
for (DatanodeDescriptor datanode : dataNodes) {
cluster.add(datanode);
}
DatanodeStorageInfo[] targets = replicator.chooseTarget("/foo", 3,
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
new HashSet<Node>(), 0, policy1);
System.out.println(Arrays.asList(targets));
Assert.assertEquals(3, targets.length);
targets = replicator.chooseTarget("/foo", 3,
dataNodes[0], Collections.<DatanodeStorageInfo>emptyList(), false,
new HashSet<Node>(), 0, policy2);
System.out.println(Arrays.asList(targets));
Assert.assertEquals(3, targets.length);
}
} }