svn merge -c 1612427 from trunk for HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes correctly.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1612430 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-07-21 23:23:52 +00:00
parent dc688e8d5e
commit 33d82259aa
4 changed files with 59 additions and 31 deletions

View File

@ -337,6 +337,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second HDFS-6493. Change dfs.namenode.startup.delay.block.deletion to second
instead of millisecond. (Juan Yu via wang) instead of millisecond. (Juan Yu via wang)
HDFS-6680. BlockPlacementPolicyDefault does not choose favored nodes
correctly. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)

View File

@ -145,14 +145,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite(); && stats.isAvoidingStaleDataNodesForWrite();
for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i); DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode. // Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode // 'results' is updated within chooseLocalNode
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize, favoriteAndExcludedNodes, blocksize,
getMaxNodesPerRack(results.size(), numOfReplicas)[1], getMaxNodesPerRack(results.size(), numOfReplicas)[1],
results, avoidStaleNodes, storageType); results, avoidStaleNodes, storageType, false);
if (target == null) { if (target == null) {
LOG.warn("Could not find a target for file " + src LOG.warn("Could not find a target for file " + src
+ " with favored node " + favoredNode); + " with favored node " + favoredNode);
@ -271,7 +271,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
try { try {
if (numOfResults == 0) { if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize, writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType) maxNodesPerRack, results, avoidStaleNodes, storageType, true)
.getDatanodeDescriptor(); .getDatanodeDescriptor();
if (--numOfReplicas == 0) { if (--numOfReplicas == 0) {
return writer; return writer;
@ -345,12 +345,14 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
int maxNodesPerRack, int maxNodesPerRack,
List<DatanodeStorageInfo> results, List<DatanodeStorageInfo> results,
boolean avoidStaleNodes, boolean avoidStaleNodes,
StorageType storageType) StorageType storageType,
boolean fallbackToLocalRack)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
if (localMachine == null) if (localMachine == null) {
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageType);
}
if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
// otherwise try local machine first // otherwise try local machine first
@ -363,7 +365,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
} }
} }
} }
if (!fallbackToLocalRack) {
return null;
}
// try a node on local rack // try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, blocksize, return chooseLocalRack(localMachine, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageType); maxNodesPerRack, results, avoidStaleNodes, storageType);

View File

@ -69,7 +69,8 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
StorageType storageType) throws NotEnoughReplicasException { StorageType storageType, boolean fallbackToLocalRack
) throws NotEnoughReplicasException {
// if no local machine, randomly choose one node // if no local machine, randomly choose one node
if (localMachine == null) if (localMachine == null)
return chooseRandom(NodeBase.ROOT, excludedNodes, return chooseRandom(NodeBase.ROOT, excludedNodes,
@ -96,6 +97,10 @@ public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefau
if (chosenStorage != null) { if (chosenStorage != null) {
return chosenStorage; return chosenStorage;
} }
if (!fallbackToLocalRack) {
return null;
}
// try a node on local rack // try a node on local rack
return chooseLocalRack(localMachine, excludedNodes, return chooseLocalRack(localMachine, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);

View File

@ -18,32 +18,41 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.*; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Random;
import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.Test; import org.apache.log4j.Level;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
public class TestFavoredNodesEndToEnd { public class TestFavoredNodesEndToEnd {
{
((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class)).getLogger().setLevel(Level.ALL);
}
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static Configuration conf; private static Configuration conf;
private final static int NUM_DATA_NODES = 10; private final static int NUM_DATA_NODES = 10;
@ -79,7 +88,7 @@ public class TestFavoredNodesEndToEnd {
InetSocketAddress datanode[] = getDatanodes(rand); InetSocketAddress datanode[] = getDatanodes(rand);
Path p = new Path("/filename"+i); Path p = new Path("/filename"+i);
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, (long)4096, null, datanode); 4096, (short)3, 4096L, null, datanode);
out.write(SOME_BYTES); out.write(SOME_BYTES);
out.close(); out.close();
BlockLocation[] locations = getBlockLocations(p); BlockLocation[] locations = getBlockLocations(p);
@ -98,14 +107,13 @@ public class TestFavoredNodesEndToEnd {
//get some other nodes. In other words, the write to hdfs should not fail //get some other nodes. In other words, the write to hdfs should not fail
//and if we do getBlockLocations on the file, we should see one blklocation //and if we do getBlockLocations on the file, we should see one blklocation
//and three hosts for that //and three hosts for that
Random rand = new Random(System.currentTimeMillis());
InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3]; InetSocketAddress arbitraryAddrs[] = new InetSocketAddress[3];
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
arbitraryAddrs[i] = getArbitraryLocalHostAddr(); arbitraryAddrs[i] = getArbitraryLocalHostAddr();
} }
Path p = new Path("/filename-foo-bar"); Path p = new Path("/filename-foo-bar");
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, (long)4096, null, arbitraryAddrs); 4096, (short)3, 4096L, null, arbitraryAddrs);
out.write(SOME_BYTES); out.write(SOME_BYTES);
out.close(); out.close();
getBlockLocations(p); getBlockLocations(p);
@ -113,35 +121,41 @@ public class TestFavoredNodesEndToEnd {
@Test(timeout=180000) @Test(timeout=180000)
public void testWhenSomeNodesAreNotGood() throws Exception { public void testWhenSomeNodesAreNotGood() throws Exception {
// 4 favored nodes
final InetSocketAddress addrs[] = new InetSocketAddress[4];
final String[] hosts = new String[addrs.length];
for (int i = 0; i < addrs.length; i++) {
addrs[i] = datanodes.get(i).getXferAddress();
hosts[i] = addrs[i].getAddress().getHostAddress() + ":" + addrs[i].getPort();
}
//make some datanode not "good" so that even if the client prefers it, //make some datanode not "good" so that even if the client prefers it,
//the namenode would not give it as a replica to write to //the namenode would not give it as a replica to write to
DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager() DatanodeInfo d = cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanodeByXferAddr( .getDatanodeManager().getDatanodeByXferAddr(
datanodes.get(0).getXferAddress().getAddress().getHostAddress(), addrs[0].getAddress().getHostAddress(), addrs[0].getPort());
datanodes.get(0).getXferAddress().getPort());
//set the decommission status to true so that //set the decommission status to true so that
//BlockPlacementPolicyDefault.isGoodTarget returns false for this dn //BlockPlacementPolicyDefault.isGoodTarget returns false for this dn
d.setDecommissioned(); d.setDecommissioned();
InetSocketAddress addrs[] = new InetSocketAddress[3];
for (int i = 0; i < 3; i++) {
addrs[i] = datanodes.get(i).getXferAddress();
}
Path p = new Path("/filename-foo-bar-baz"); Path p = new Path("/filename-foo-bar-baz");
final short replication = (short)3;
FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true, FSDataOutputStream out = dfs.create(p, FsPermission.getDefault(), true,
4096, (short)3, (long)4096, null, addrs); 4096, replication, 4096L, null, addrs);
out.write(SOME_BYTES); out.write(SOME_BYTES);
out.close(); out.close();
//reset the state //reset the state
d.stopDecommission(); d.stopDecommission();
BlockLocation[] locations = getBlockLocations(p); BlockLocation[] locations = getBlockLocations(p);
Assert.assertEquals(replication, locations[0].getNames().length);;
//also make sure that the datanode[0] is not in the list of hosts //also make sure that the datanode[0] is not in the list of hosts
String datanode0 = for (int i = 0; i < replication; i++) {
datanodes.get(0).getXferAddress().getAddress().getHostAddress() final String loc = locations[0].getNames()[i];
+ ":" + datanodes.get(0).getXferAddress().getPort(); int j = 0;
for (int i = 0; i < 3; i++) { for(; j < hosts.length && !loc.equals(hosts[j]); j++);
if (locations[0].getNames()[i].equals(datanode0)) { Assert.assertTrue("j=" + j, j > 0);
fail(datanode0 + " not supposed to be a replica for the block"); Assert.assertTrue("loc=" + loc + " not in host list "
} + Arrays.asList(hosts) + ", j=" + j, j < hosts.length);
} }
} }