HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget. (yliu)

This commit is contained in:
yliu 2015-08-20 20:07:18 +08:00
parent 36b1a1e784
commit 80a29906bc
4 changed files with 161 additions and 102 deletions

View File

@ -819,6 +819,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8917. Cleanup BlockInfoUnderConstruction from comments and tests. HDFS-8917. Cleanup BlockInfoUnderConstruction from comments and tests.
(Zhe Zhang via jing9) (Zhe Zhang via jing9)
HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget.
(yliu)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -438,16 +438,10 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return writer; return writer;
} }
/**
* Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen storage
*/
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
if (localMachine == null) { if (localMachine == null) {
@ -458,7 +452,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
&& clusterMap.contains(localMachine)) { && clusterMap.contains(localMachine)) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first // otherwise try local machine first
if (excludedNodes.add(localMachine)) { // was not in the excluded list if (excludedNodes.add(localMachine) // was not in the excluded list
&& isGoodDatanode(localDatanode, maxNodesPerRack, false,
results, avoidStaleNodes)) {
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); iter.hasNext(); ) { .entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<StorageType, Integer> entry = iter.next(); Map.Entry<StorageType, Integer> entry = iter.next();
@ -466,7 +462,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
localDatanode.getStorageInfos())) { localDatanode.getStorageInfos())) {
StorageType type = entry.getKey(); StorageType type = entry.getKey();
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { results, type) >= 0) {
int num = entry.getValue(); int num = entry.getValue();
if (num == 1) { if (num == 1) {
iter.remove(); iter.remove();
@ -479,6 +475,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
} }
} }
return null;
}
/**
* Choose <i>localMachine</i> as the target.
* if <i>localMachine</i> is not available,
* choose a node on the same rack
* @return the chosen storage
*/
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
throws NotEnoughReplicasException {
DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
excludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storageTypes);
if (localStorage != null) {
return localStorage;
}
if (!fallbackToLocalRack) { if (!fallbackToLocalRack) {
return null; return null;
@ -653,6 +669,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" [");
} }
numOfAvailableNodes--; numOfAvailableNodes--;
if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
results, avoidStaleNodes)) {
if (LOG.isDebugEnabled()) {
builder.append("\n]");
}
badTarget = true;
continue;
}
final DatanodeStorageInfo[] storages = DFSUtil.shuffle( final DatanodeStorageInfo[] storages = DFSUtil.shuffle(
chosenNode.getStorageInfos()); chosenNode.getStorageInfos());
@ -664,8 +688,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
for (i = 0; i < storages.length; i++) { for (i = 0; i < storages.length; i++) {
StorageType type = entry.getKey(); StorageType type = entry.getKey();
final int newExcludedNodes = addIfIsGoodTarget(storages[i], final int newExcludedNodes = addIfIsGoodTarget(storages[i],
excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, excludedNodes, blocksize, results, type);
avoidStaleNodes, type);
if (newExcludedNodes >= 0) { if (newExcludedNodes >= 0) {
numOfReplicas--; numOfReplicas--;
if (firstChosen == null) { if (firstChosen == null) {
@ -725,13 +748,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int addIfIsGoodTarget(DatanodeStorageInfo storage, int addIfIsGoodTarget(DatanodeStorageInfo storage,
Set<Node> excludedNodes, Set<Node> excludedNodes,
long blockSize, long blockSize,
int maxNodesPerRack,
boolean considerLoad,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType storageType) { StorageType storageType) {
if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad, if (isGoodTarget(storage, blockSize, results, storageType)) {
results, avoidStaleNodes, storageType)) {
results.add(storage); results.add(storage);
// add node and related nodes to excludedNode // add node and related nodes to excludedNode
return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
@ -749,11 +768,20 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
} }
private static void logNodeIsNotChosen(DatanodeDescriptor node,
String reason) {
if (LOG.isDebugEnabled()) {
// build the error message for later use.
debugLoggingBuilder.get()
.append("\n Datanode ").append(node)
.append(" is not chosen since ").append(reason).append(".");
}
}
/** /**
* Determine if a storage is a good target. * Determine if a datanode is good for placing block.
* *
* @param storage The target storage * @param node The target datanode
* @param blockSize Size of block
* @param maxTargetPerRack Maximum number of targets per rack. The value of * @param maxTargetPerRack Maximum number of targets per rack. The value of
* this parameter depends on the number of racks in * this parameter depends on the number of racks in
* the cluster and total number of replicas for a block * the cluster and total number of replicas for a block
@ -761,15 +789,65 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* @param results A list containing currently chosen nodes. Used to check if * @param results A list containing currently chosen nodes. Used to check if
* too many nodes has been chosen in the target rack. * too many nodes has been chosen in the target rack.
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
* @return Return true if <i>node</i> has enough space, * @return Reture true if the datanode is good candidate, otherwise false
* does not have too much load, */
* and the rack does not have too many nodes. boolean isGoodDatanode(DatanodeDescriptor node,
int maxTargetPerRack, boolean considerLoad,
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
logNodeIsNotChosen(node, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
logNodeIsNotChosen(node, "the node is stale ");
return false;
}
}
// check the communication traffic of the target machine
if (considerLoad) {
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
final int nodeLoad = node.getXceiverCount();
if (nodeLoad > maxLoad) {
logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad
+ " > " + maxLoad + ") ");
return false;
}
}
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
for(DatanodeStorageInfo resultStorage : results) {
if (rackname.equals(
resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
if (counter > maxTargetPerRack) {
logNodeIsNotChosen(node, "the rack has too many chosen nodes ");
return false;
}
return true;
}
/**
* Determine if a storage is a good target.
*
* @param storage The target storage
* @param blockSize Size of block
* @param results A list containing currently chosen nodes. Used to check if
* too many nodes has been chosen in the target rack.
* @return Return true if <i>node</i> has enough space.
*/ */
private boolean isGoodTarget(DatanodeStorageInfo storage, private boolean isGoodTarget(DatanodeStorageInfo storage,
long blockSize, int maxTargetPerRack, long blockSize,
boolean considerLoad,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes,
StorageType requiredStorageType) { StorageType requiredStorageType) {
if (storage.getStorageType() != requiredStorageType) { if (storage.getStorageType() != requiredStorageType) {
logNodeIsNotChosen(storage, "storage types do not match," logNodeIsNotChosen(storage, "storage types do not match,"
@ -787,18 +865,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
DatanodeDescriptor node = storage.getDatanodeDescriptor(); DatanodeDescriptor node = storage.getDatanodeDescriptor();
// check if the node is (being) decommissioned
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
return false;
}
if (avoidStaleNodes) {
if (node.isStale(this.staleInterval)) {
logNodeIsNotChosen(storage, "the node is stale ");
return false;
}
}
final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE; final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE;
final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType()); final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType());
@ -812,30 +878,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
return false; return false;
} }
// check the communication traffic of the target machine
if (considerLoad) {
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
final int nodeLoad = node.getXceiverCount();
if (nodeLoad > maxLoad) {
logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad
+ " > " + maxLoad + ") ");
return false;
}
}
// check if the target rack has chosen too many nodes
String rackname = node.getNetworkLocation();
int counter=1;
for(DatanodeStorageInfo resultStorage : results) {
if (rackname.equals(
resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
counter++;
}
}
if (counter>maxTargetPerRack) {
logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
return false;
}
return true; return true;
} }

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.*; import java.util.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
@ -67,35 +65,12 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine,
if (localMachine == null) excludedNodes, blocksize, maxNodesPerRack, results,
return chooseRandom(NodeBase.ROOT, excludedNodes, avoidStaleNodes, storageTypes);
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); if (localStorage != null) {
// otherwise try local machine first
if (localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
if (excludedNodes.add(localMachine)) { // was not in the excluded list
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); iter.hasNext(); ) {
Map.Entry<StorageType, Integer> entry = iter.next();
for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
localDataNode.getStorageInfos())) {
StorageType type = entry.getKey();
if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
int num = entry.getValue();
if (num == 1) {
iter.remove();
} else {
entry.setValue(num - 1);
}
return localStorage; return localStorage;
} }
}
}
}
}
// try a node on local node group // try a node on local node group
DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup( DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(

View File

@ -29,8 +29,11 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.StaticMapping; import org.apache.hadoop.net.StaticMapping;
import org.junit.After; import org.junit.After;
@ -81,7 +84,37 @@ public class TestDefaultBlockPlacementPolicy {
// Map client to RACK2 // Map client to RACK2
String clientRack = "/RACK2"; String clientRack = "/RACK2";
StaticMapping.addNodeToRack(clientMachine, clientRack); StaticMapping.addNodeToRack(clientMachine, clientRack);
testPlacement(clientMachine, clientRack); testPlacement(clientMachine, clientRack, true);
}
/**
* Verify local node selection
*/
@Test
public void testLocalStoragePlacement() throws Exception {
String clientMachine = "/host3";
testPlacement(clientMachine, "/RACK3", true);
}
/**
* Verify decommissioned nodes should not be selected.
*/
@Test
public void testPlacementWithLocalRackNodesDecommissioned() throws Exception {
String clientMachine = "client.foo.com";
// Map client to RACK3
String clientRack = "/RACK3";
StaticMapping.addNodeToRack(clientMachine, clientRack);
final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
DatanodeDescriptor dnd3 = dnm.getDatanode(
cluster.getDataNodes().get(3).getDatanodeId());
assertEquals(dnd3.getNetworkLocation(), clientRack);
dnm.getDecomManager().startDecommission(dnd3);
try {
testPlacement(clientMachine, clientRack, false);
} finally {
dnm.getDecomManager().stopDecommission(dnd3);
}
} }
/** /**
@ -93,11 +126,11 @@ public class TestDefaultBlockPlacementPolicy {
// Don't map client machine to any rack, // Don't map client machine to any rack,
// so by default it will be treated as /default-rack // so by default it will be treated as /default-rack
// in that case a random node should be selected as first node. // in that case a random node should be selected as first node.
testPlacement(clientMachine, null); testPlacement(clientMachine, null, true);
} }
private void testPlacement(String clientMachine, private void testPlacement(String clientMachine,
String clientRack) throws IOException { String clientRack, boolean hasBlockReplicaOnRack) throws IOException {
// write 5 files and check whether all times block placed // write 5 files and check whether all times block placed
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
String src = "/test-" + i; String src = "/test-" + i;
@ -111,8 +144,14 @@ public class TestDefaultBlockPlacementPolicy {
assertEquals("Block should be allocated sufficient locations", assertEquals("Block should be allocated sufficient locations",
REPLICATION_FACTOR, locatedBlock.getLocations().length); REPLICATION_FACTOR, locatedBlock.getLocations().length);
if (clientRack != null) { if (clientRack != null) {
if (hasBlockReplicaOnRack) {
assertEquals("First datanode should be rack local", clientRack, assertEquals("First datanode should be rack local", clientRack,
locatedBlock.getLocations()[0].getNetworkLocation()); locatedBlock.getLocations()[0].getNetworkLocation());
} else {
for (DatanodeInfo dni : locatedBlock.getLocations()) {
assertNotEquals(clientRack, dni.getNetworkLocation());
}
}
} }
nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(),
src, clientMachine); src, clientMachine);