diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 080f0d4afeb..a0ca52a2b18 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -819,6 +819,9 @@ Release 2.8.0 - UNRELEASED HDFS-8917. Cleanup BlockInfoUnderConstruction from comments and tests. (Zhe Zhang via jing9) + HDFS-8884. Fail-fast check in BlockPlacementPolicyDefault#chooseTarget. + (yliu) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 9023e0a12b3..3aea5c9b686 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -437,17 +437,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { maxNodesPerRack, results, avoidStaleNodes, storageTypes); return writer; } - - /** - * Choose localMachine as the target. - * if localMachine is not available, - * choose a node on the same rack - * @return the chosen storage - */ + protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes, - EnumMap storageTypes, boolean fallbackToLocalRack) + EnumMap storageTypes) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) { @@ -458,7 +452,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { && clusterMap.contains(localMachine)) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first - if (excludedNodes.add(localMachine)) { // was not in the excluded list + if (excludedNodes.add(localMachine) // was not in the excluded list + && isGoodDatanode(localDatanode, maxNodesPerRack, false, + results, avoidStaleNodes)) { for (Iterator> iter = storageTypes .entrySet().iterator(); iter.hasNext(); ) { Map.Entry entry = iter.next(); @@ -466,7 +462,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { localDatanode.getStorageInfos())) { StorageType type = entry.getKey(); if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { + results, type) >= 0) { int num = entry.getValue(); if (num == 1) { iter.remove(); @@ -479,6 +475,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } } + return null; + } + + /** + * Choose localMachine as the target. + * if localMachine is not available, + * choose a node on the same rack + * @return the chosen storage + */ + protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, + Set excludedNodes, long blocksize, int maxNodesPerRack, + List results, boolean avoidStaleNodes, + EnumMap storageTypes, boolean fallbackToLocalRack) + throws NotEnoughReplicasException { + DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine, + excludedNodes, blocksize, maxNodesPerRack, results, + avoidStaleNodes, storageTypes); + if (localStorage != null) { + return localStorage; + } if (!fallbackToLocalRack) { return null; @@ -653,6 +669,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); } numOfAvailableNodes--; + if (!isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + if (LOG.isDebugEnabled()) { + builder.append("\n]"); + } + badTarget = true; + continue; + } final DatanodeStorageInfo[] storages = DFSUtil.shuffle( chosenNode.getStorageInfos()); @@ -664,8 +688,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { for (i = 0; i < storages.length; i++) { StorageType type = entry.getKey(); final int newExcludedNodes = addIfIsGoodTarget(storages[i], - excludedNodes, blocksize, maxNodesPerRack, considerLoad, results, - avoidStaleNodes, type); + excludedNodes, blocksize, results, type); if (newExcludedNodes >= 0) { numOfReplicas--; if (firstChosen == null) { @@ -725,13 +748,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int addIfIsGoodTarget(DatanodeStorageInfo storage, Set excludedNodes, long blockSize, - int maxNodesPerRack, - boolean considerLoad, - List results, - boolean avoidStaleNodes, + List results, StorageType storageType) { - if (isGoodTarget(storage, blockSize, maxNodesPerRack, considerLoad, - results, avoidStaleNodes, storageType)) { + if (isGoodTarget(storage, blockSize, results, storageType)) { results.add(storage); // add node and related nodes to excludedNode return addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); @@ -749,27 +768,86 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } + private static void logNodeIsNotChosen(DatanodeDescriptor node, + String reason) { + if (LOG.isDebugEnabled()) { + // build the error message for later use. + debugLoggingBuilder.get() + .append("\n Datanode ").append(node) + .append(" is not chosen since ").append(reason).append("."); + } + } + /** - * Determine if a storage is a good target. - * - * @param storage The target storage - * @param blockSize Size of block - * @param maxTargetPerRack Maximum number of targets per rack. The value of - * this parameter depends on the number of racks in + * Determine if a datanode is good for placing block. + * + * @param node The target datanode + * @param maxTargetPerRack Maximum number of targets per rack. The value of + * this parameter depends on the number of racks in * the cluster and total number of replicas for a block * @param considerLoad whether or not to consider load of the target node - * @param results A list containing currently chosen nodes. Used to check if + * @param results A list containing currently chosen nodes. Used to check if * too many nodes has been chosen in the target rack. * @param avoidStaleNodes Whether or not to avoid choosing stale nodes - * @return Return true if node has enough space, - * does not have too much load, - * and the rack does not have too many nodes. + * @return Reture true if the datanode is good candidate, otherwise false + */ + boolean isGoodDatanode(DatanodeDescriptor node, + int maxTargetPerRack, boolean considerLoad, + List results, + boolean avoidStaleNodes) { + // check if the node is (being) decommissioned + if (node.isDecommissionInProgress() || node.isDecommissioned()) { + logNodeIsNotChosen(node, "the node is (being) decommissioned "); + return false; + } + + if (avoidStaleNodes) { + if (node.isStale(this.staleInterval)) { + logNodeIsNotChosen(node, "the node is stale "); + return false; + } + } + + // check the communication traffic of the target machine + if (considerLoad) { + final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if (nodeLoad > maxLoad) { + logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad + + " > " + maxLoad + ") "); + return false; + } + } + + // check if the target rack has chosen too many nodes + String rackname = node.getNetworkLocation(); + int counter=1; + for(DatanodeStorageInfo resultStorage : results) { + if (rackname.equals( + resultStorage.getDatanodeDescriptor().getNetworkLocation())) { + counter++; + } + } + if (counter > maxTargetPerRack) { + logNodeIsNotChosen(node, "the rack has too many chosen nodes "); + return false; + } + + return true; + } + + /** + * Determine if a storage is a good target. + * + * @param storage The target storage + * @param blockSize Size of block + * @param results A list containing currently chosen nodes. Used to check if + * too many nodes has been chosen in the target rack. + * @return Return true if node has enough space. */ private boolean isGoodTarget(DatanodeStorageInfo storage, - long blockSize, int maxTargetPerRack, - boolean considerLoad, + long blockSize, List results, - boolean avoidStaleNodes, StorageType requiredStorageType) { if (storage.getStorageType() != requiredStorageType) { logNodeIsNotChosen(storage, "storage types do not match," @@ -787,19 +865,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } DatanodeDescriptor node = storage.getDatanodeDescriptor(); - // check if the node is (being) decommissioned - if (node.isDecommissionInProgress() || node.isDecommissioned()) { - logNodeIsNotChosen(storage, "the node is (being) decommissioned "); - return false; - } - if (avoidStaleNodes) { - if (node.isStale(this.staleInterval)) { - logNodeIsNotChosen(storage, "the node is stale "); - return false; - } - } - final long requiredSize = blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE; final long scheduledSize = blockSize * node.getBlocksScheduled(storage.getStorageType()); final long remaining = node.getRemaining(storage.getStorageType()); @@ -812,30 +878,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { return false; } - // check the communication traffic of the target machine - if (considerLoad) { - final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); - final int nodeLoad = node.getXceiverCount(); - if (nodeLoad > maxLoad) { - logNodeIsNotChosen(storage, "the node is too busy (load: " + nodeLoad - + " > " + maxLoad + ") "); - return false; - } - } - - // check if the target rack has chosen too many nodes - String rackname = node.getNetworkLocation(); - int counter=1; - for(DatanodeStorageInfo resultStorage : results) { - if (rackname.equals( - resultStorage.getDatanodeDescriptor().getNetworkLocation())) { - counter++; - } - } - if (counter>maxTargetPerRack) { - logNodeIsNotChosen(storage, "the rack has too many chosen nodes "); - return false; - } return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 28a1e56dad1..b1c4b7819e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.*; import org.apache.hadoop.conf.Configuration; - import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopologyWithNodeGroup; @@ -67,34 +65,11 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau List results, boolean avoidStaleNodes, EnumMap storageTypes, boolean fallbackToLocalRack) throws NotEnoughReplicasException { - // if no local machine, randomly choose one node - if (localMachine == null) - return chooseRandom(NodeBase.ROOT, excludedNodes, - blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); - - // otherwise try local machine first - if (localMachine instanceof DatanodeDescriptor) { - DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine; - if (excludedNodes.add(localMachine)) { // was not in the excluded list - for (Iterator> iter = storageTypes - .entrySet().iterator(); iter.hasNext(); ) { - Map.Entry entry = iter.next(); - for (DatanodeStorageInfo localStorage : DFSUtil.shuffle( - localDataNode.getStorageInfos())) { - StorageType type = entry.getKey(); - if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize, - maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) { - int num = entry.getValue(); - if (num == 1) { - iter.remove(); - } else { - entry.setValue(num - 1); - } - return localStorage; - } - } - } - } + DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine, + excludedNodes, blocksize, maxNodesPerRack, results, + avoidStaleNodes, storageTypes); + if (localStorage != null) { + return localStorage; } // try a node on local node group diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java index 38daddc9a01..5709cee8be7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java @@ -29,8 +29,11 @@ import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.net.StaticMapping; import org.junit.After; @@ -81,7 +84,37 @@ public class TestDefaultBlockPlacementPolicy { // Map client to RACK2 String clientRack = "/RACK2"; StaticMapping.addNodeToRack(clientMachine, clientRack); - testPlacement(clientMachine, clientRack); + testPlacement(clientMachine, clientRack, true); + } + + /** + * Verify local node selection + */ + @Test + public void testLocalStoragePlacement() throws Exception { + String clientMachine = "/host3"; + testPlacement(clientMachine, "/RACK3", true); + } + + /** + * Verify decommissioned nodes should not be selected. + */ + @Test + public void testPlacementWithLocalRackNodesDecommissioned() throws Exception { + String clientMachine = "client.foo.com"; + // Map client to RACK3 + String clientRack = "/RACK3"; + StaticMapping.addNodeToRack(clientMachine, clientRack); + final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager(); + DatanodeDescriptor dnd3 = dnm.getDatanode( + cluster.getDataNodes().get(3).getDatanodeId()); + assertEquals(dnd3.getNetworkLocation(), clientRack); + dnm.getDecomManager().startDecommission(dnd3); + try { + testPlacement(clientMachine, clientRack, false); + } finally { + dnm.getDecomManager().stopDecommission(dnd3); + } } /** @@ -93,11 +126,11 @@ public class TestDefaultBlockPlacementPolicy { // Don't map client machine to any rack, // so by default it will be treated as /default-rack // in that case a random node should be selected as first node. - testPlacement(clientMachine, null); + testPlacement(clientMachine, null, true); } private void testPlacement(String clientMachine, - String clientRack) throws IOException { + String clientRack, boolean hasBlockReplicaOnRack) throws IOException { // write 5 files and check whether all times block placed for (int i = 0; i < 5; i++) { String src = "/test-" + i; @@ -111,8 +144,14 @@ public class TestDefaultBlockPlacementPolicy { assertEquals("Block should be allocated sufficient locations", REPLICATION_FACTOR, locatedBlock.getLocations().length); if (clientRack != null) { - assertEquals("First datanode should be rack local", clientRack, - locatedBlock.getLocations()[0].getNetworkLocation()); + if (hasBlockReplicaOnRack) { + assertEquals("First datanode should be rack local", clientRack, + locatedBlock.getLocations()[0].getNetworkLocation()); + } else { + for (DatanodeInfo dni : locatedBlock.getLocations()) { + assertNotEquals(clientRack, dni.getNetworkLocation()); + } + } } nameNodeRpc.abandonBlock(locatedBlock.getBlock(), fileStatus.getFileId(), src, clientMachine);