From 44d9bb26d640ca5c1de651563c7993b4ecd6b653 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Mon, 21 Jul 2014 23:21:12 +0000 Subject: [PATCH] HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes correctly. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612427 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../BlockPlacementPolicyDefault.java | 18 ++++-- .../BlockPlacementPolicyWithNodeGroup.java | 7 ++- .../namenode/TestFavoredNodesEndToEnd.java | 62 ++++++++++++------- 4 files changed, 59 insertions(+), 31 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 37878727536..ec75643e539 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -589,6 +589,9 @@ Release 2.5.0 - UNRELEASED HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second instead of millisecond. (Juan Yu via wang) + HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes + correctly. (szetszwo) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index accdddf5250..e2026c1dfbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { List results = new ArrayList(); boolean avoidStaleNodes = stats != null && stats.isAvoidingStaleDataNodesForWrite(); - for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) { DatanodeDescriptor favoredNode = favoredNodes.get(i); // Choose a single node which is local to favoredNode. // 'results' is updated within chooseLocalNode final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageType, false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); @@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { try { if (numOfResults == 0) { writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType) + maxNodesPerRack, results, avoidStaleNodes, storageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { int maxNodesPerRack, List results, boolean avoidStaleNodes, - StorageType storageType) + StorageType storageType, + boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node - if (localMachine == null) + if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first @@ -363,7 +365,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } } } - } + } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java index 1069b4e3d75..b3ff6b9b1f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java @@ -70,7 +70,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set excludedNodes, long blocksize, int maxNodesPerRack, List results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + StorageType storageType, boolean fallbackToLocalRack + ) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, @@ -97,6 +98,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau if (chosenStorage != null) { return chosenStorage; } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index ea5bb7a91ed..4f110372be8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -18,32 +18,41 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; -import java.util.ArrayList; -import java.util.Random; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.junit.Test; +import org.apache.log4j.Level; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Test; public class TestFavoredNodesEndToEnd { + { + ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)).getLogger().setLevel(Level.ALL); + } + private static MiniDFSCluster cluster; private static Configuration conf; private final static int NUM_DATA_NODES = 10; @@ -79,7 +88,7 @@ public class TestFavoredNodesEndToEnd { InetSocketAddress datanode[] = getDatanodes(rand); Path p = new Path("/filename"+i); FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, datanode); + 4096, (short)3, 4096L, null, datanode); out.write(SOME_BYTES); out.close(); BlockLocation[] locations = getBlockLocations(p); @@ -98,14 +107,13 @@ public class TestFavoredNodesEndToEnd { //get some other nodes. In other words, the write to hdfs should not fail //and if we do getBlockLocations on the file, we should see one blklocation //and three hosts for that - Random rand = new Random(System.currentTimeMillis()); InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3]; for (int i = 0; i < 3; i++) { arbitraryAddrs[i] = getArbitraryLocalHostAddr(); } Path p = new Path("/filename-foo-bar"); FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, arbitraryAddrs); + 4096, (short)3, 4096L, null, arbitraryAddrs); out.write(SOME_BYTES); out.close(); getBlockLocations(p); @@ -113,35 +121,41 @@ public class TestFavoredNodesEndToEnd { @Test(timeout=180000) public void testWhenSomeNodesAreNotGood() throws Exception { + // 4 favored nodes + final InetSocketAddress addrs[] = new InetSocketAddress[4]; + final String[] hosts = new String[addrs.length]; + for (int i = 0; i < addrs.length; i++) { + addrs[i] = datanodes.get(i).getXferAddress(); + hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort(); + } + //make some datanode not "good" so that even if the client prefers it, //the namenode would not give it as a replica to write to DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager() .getDatanodeManager().getDatanodeByXferAddr( - datanodes.get(0).getXferAddress().getAddress().getHostAddress(), - datanodes.get(0).getXferAddress().getPort()); + addrs[0].getAddress().getHostAddress(), addrs[0].getPort()); //set the decommission status to true so that //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn d.setDecommissioned(); - InetSocketAddress addrs[] = new InetSocketAddress[3]; - for (int i = 0; i < 3; i++) { - addrs[i] = datanodes.get(i).getXferAddress(); - } Path p = new Path("/filename-foo-bar-baz"); + final short replication = (short)3; FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, - 4096, (short)3, (long)4096, null, addrs); + 4096, replication, 4096L, null, addrs); out.write(SOME_BYTES); out.close(); //reset the state d.stopDecommission(); + BlockLocation[] locations = getBlockLocations(p); + Assert.assertEquals(replication, locations[0].getNames().length);; //also make sure that the datanode[0] is not in the list of hosts - String datanode0 = - datanodes.get(0).getXferAddress().getAddress().getHostAddress() - + ":" + datanodes.get(0).getXferAddress().getPort(); - for (int i = 0; i < 3; i++) { - if (locations[0].getNames()[i].equals(datanode0)) { - fail(datanode0 + " not supposed to be a replica for the block"); - } + for (int i = 0; i < replication; i++) { + final String loc = locations[0].getNames()[i]; + int j = 0; + for(; j < hosts.length && !loc.equals(hosts[j]); j++); + Assert.assertTrue("j=" + j, j > 0); + Assert.assertTrue("loc=" + loc + " not in host list " + + Arrays.asList(hosts) + ", j=" + j, j < hosts.length); } }