diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index e1d29689aec..1e23ff687e1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -29,13 +29,13 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.util.ReflectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -54,8 +54,8 @@ import com.google.common.collect.Lists; public class NetworkTopology { public final static String DEFAULT_RACK = "/default-rack"; public final static int DEFAULT_HOST_LEVEL = 2; - public static final Log LOG = - LogFactory.getLog(NetworkTopology.class); + public static final Logger LOG = + LoggerFactory.getLogger(NetworkTopology.class); public static class InvalidTopologyException extends RuntimeException { private static final long serialVersionUID = 1L; @@ -442,9 +442,7 @@ public class NetworkTopology { } } } - if(LOG.isDebugEnabled()) { - LOG.debug("NetworkTopology became:\n" + this.toString()); - } + LOG.debug("NetworkTopology became:\n{}", this.toString()); } finally { netlock.writeLock().unlock(); } @@ -517,9 +515,7 @@ public class NetworkTopology { numOfRacks--; } } - if(LOG.isDebugEnabled()) { - LOG.debug("NetworkTopology became:\n" + this.toString()); - } + LOG.debug("NetworkTopology became:\n{}", this.toString()); } finally { netlock.writeLock().unlock(); } @@ -717,26 +713,45 @@ public class NetworkTopology { r.setSeed(seed); } - /** randomly choose one node from scope - * if scope starts with ~, choose one from the all nodes except for the - * ones in scope; otherwise, choose one from scope + /** + * Randomly choose a node. + * * @param scope range of nodes from which a node will be chosen * @return the chosen node + * + * @see #chooseRandom(String, Collection) */ - public Node chooseRandom(String scope) { + public Node chooseRandom(final String scope) { + return chooseRandom(scope, null); + } + + /** + * Randomly choose one node from scope. + * + * If scope starts with ~, choose one from the all nodes except for the + * ones in scope; otherwise, choose one from scope. + * If excludedNodes is given, choose a node that's not in excludedNodes. + * + * @param scope range of nodes from which a node will be chosen + * @param excludedNodes nodes to be excluded from + * @return the chosen node + */ + public Node chooseRandom(final String scope, + final Collection excludedNodes) { netlock.readLock().lock(); try { if (scope.startsWith("~")) { - return chooseRandom(NodeBase.ROOT, scope.substring(1)); + return chooseRandom(NodeBase.ROOT, scope.substring(1), excludedNodes); } else { - return chooseRandom(scope, null); + return chooseRandom(scope, null, excludedNodes); } } finally { netlock.readLock().unlock(); } } - private Node chooseRandom(String scope, String excludedScope){ + private Node chooseRandom(final String scope, String excludedScope, + final Collection excludedNodes) { if (excludedScope != null) { if (scope.startsWith(excludedScope)) { return null; @@ -747,7 +762,8 @@ public class NetworkTopology { } Node node = getNode(scope); if (!(node instanceof InnerNode)) { - return node; + return excludedNodes != null && excludedNodes.contains(node) ? + null : node; } InnerNode innerNode = (InnerNode)node; int numOfDatanodes = innerNode.getNumOfLeaves(); @@ -762,12 +778,36 @@ public class NetworkTopology { } } if (numOfDatanodes == 0) { - throw new InvalidTopologyException( - "Failed to find datanode (scope=\"" + String.valueOf(scope) + - "\" excludedScope=\"" + String.valueOf(excludedScope) + "\")."); + LOG.warn("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\").", + String.valueOf(scope), String.valueOf(excludedScope)); + return null; } - int leaveIndex = r.nextInt(numOfDatanodes); - return innerNode.getLeaf(leaveIndex, node); + Node ret = null; + final int availableNodes; + if (excludedScope == null) { + availableNodes = countNumOfAvailableNodes(scope, excludedNodes); + } else { + availableNodes = + countNumOfAvailableNodes("~" + excludedScope, excludedNodes); + } + LOG.debug("Choosing random from {} available nodes on node {}," + + " scope={}, excludedScope={}, excludeNodes={}", availableNodes, + innerNode.toString(), scope, excludedScope, excludedNodes); + if (availableNodes > 0) { + do { + int leaveIndex = r.nextInt(numOfDatanodes); + ret = innerNode.getLeaf(leaveIndex, node); + if (excludedNodes == null || !excludedNodes.contains(ret)) { + break; + } else { + LOG.debug("Node {} is excluded, continuing.", ret); + } + // We've counted numOfAvailableNodes inside the lock, so there must be + // at least 1 satisfying node. Keep trying until we found it. + } while (true); + } + LOG.debug("chooseRandom returning {}", ret); + return ret; } /** return leaves in scope @@ -795,6 +835,7 @@ public class NetworkTopology { * @param excludedNodes a list of nodes * @return number of available nodes */ + @VisibleForTesting public int countNumOfAvailableNodes(String scope, Collection excludedNodes) { boolean isExcluded=false; @@ -807,16 +848,18 @@ public class NetworkTopology { int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes netlock.readLock().lock(); try { - for (Node node : excludedNodes) { - node = getNode(NodeBase.getPath(node)); - if (node == null) { - continue; - } - if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR) - .startsWith(scope + NodeBase.PATH_SEPARATOR_STR)) { - excludedCountInScope++; - } else { - excludedCountOffScope++; + if (excludedNodes != null) { + for (Node node : excludedNodes) { + node = getNode(NodeBase.getPath(node)); + if (node == null) { + continue; + } + if ((NodeBase.getPath(node) + NodeBase.PATH_SEPARATOR_STR) + .startsWith(scope + NodeBase.PATH_SEPARATOR_STR)) { + excludedCountInScope++; + } else { + excludedCountOffScope++; + } } } Node n = getNode(scope); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java index 74c1c786e87..fcd01bd2361 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; +import java.util.Collection; import java.util.Random; import org.apache.commons.logging.Log; @@ -28,6 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; /** * Space balanced block placement policy. @@ -68,9 +70,12 @@ public class AvailableSpaceBlockPlacementPolicy extends } @Override - protected DatanodeDescriptor chooseDataNode(String scope) { - DatanodeDescriptor a = (DatanodeDescriptor) clusterMap.chooseRandom(scope); - DatanodeDescriptor b = (DatanodeDescriptor) clusterMap.chooseRandom(scope); + protected DatanodeDescriptor chooseDataNode(final String scope, + final Collection excludedNode) { + DatanodeDescriptor a = + (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode); + DatanodeDescriptor b = + (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode); int ret = compareDataNode(a, b); if (ret == 0) { return a; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 3f1e09ae129..19f4dde2577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow; import java.util.*; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.AddBlockFlag; @@ -643,10 +644,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { /** * Choose numOfReplicas nodes from the racks * that localMachine is NOT on. - * if not enough nodes are available, choose the remaining ones + * If not enough nodes are available, choose the remaining ones * from the local rack */ - protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, Set excludedNodes, @@ -702,10 +702,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { boolean avoidStaleNodes, EnumMap storageTypes) throws NotEnoughReplicasException { - - int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( - scope, excludedNodes); - int refreshCounter = numOfAvailableNodes; StringBuilder builder = null; if (LOG.isDebugEnabled()) { builder = debugLoggingBuilder.get(); @@ -714,37 +710,39 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { } boolean badTarget = false; DatanodeStorageInfo firstChosen = null; - while(numOfReplicas > 0 && numOfAvailableNodes > 0) { - DatanodeDescriptor chosenNode = chooseDataNode(scope); - if (excludedNodes.add(chosenNode)) { //was not in the excluded list - if (LOG.isDebugEnabled()) { - builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); - } - numOfAvailableNodes--; - DatanodeStorageInfo storage = null; - if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, - results, avoidStaleNodes)) { - for (Iterator> iter = storageTypes - .entrySet().iterator(); iter.hasNext(); ) { - Map.Entry entry = iter.next(); - storage = chooseStorage4Block( - chosenNode, blocksize, results, entry.getKey()); - if (storage != null) { - numOfReplicas--; - if (firstChosen == null) { - firstChosen = storage; - } - // add node and related nodes to excludedNode - numOfAvailableNodes -= - addToExcludedNodes(chosenNode, excludedNodes); - int num = entry.getValue(); - if (num == 1) { - iter.remove(); - } else { - entry.setValue(num - 1); - } - break; + while (numOfReplicas > 0) { + DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes); + if (chosenNode == null) { + break; + } + Preconditions.checkState(excludedNodes.add(chosenNode), "chosenNode " + + chosenNode + " is already in excludedNodes " + excludedNodes); + if (LOG.isDebugEnabled()) { + builder.append("\nNode ").append(NodeBase.getPath(chosenNode)) + .append(" ["); + } + DatanodeStorageInfo storage = null; + if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, + results, avoidStaleNodes)) { + for (Iterator> iter = storageTypes + .entrySet().iterator(); iter.hasNext();) { + Map.Entry entry = iter.next(); + storage = chooseStorage4Block( + chosenNode, blocksize, results, entry.getKey()); + if (storage != null) { + numOfReplicas--; + if (firstChosen == null) { + firstChosen = storage; } + // add node (subclasses may also add related nodes) to excludedNode + addToExcludedNodes(chosenNode, excludedNodes); + int num = entry.getValue(); + if (num == 1) { + iter.remove(); + } else { + entry.setValue(num - 1); + } + break; } } @@ -755,16 +753,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { // If no candidate storage was found on this DN then set badTarget. badTarget = (storage == null); } - // Refresh the node count. If the live node count became smaller, - // but it is not reflected in this loop, it may loop forever in case - // the replicas/rack cannot be satisfied. - if (--refreshCounter == 0) { - numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(scope, - excludedNodes); - refreshCounter = numOfAvailableNodes; - } } - if (numOfReplicas>0) { String detail = enableDebugLogging; if (LOG.isDebugEnabled()) { @@ -785,8 +774,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { * Choose a datanode from the given scope. * @return the chosen node, if there is any. */ - protected DatanodeDescriptor chooseDataNode(final String scope) { - return (DatanodeDescriptor) clusterMap.chooseRandom(scope); + protected DatanodeDescriptor chooseDataNode(final String scope, + final Collection excludedNodes) { + return (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNodes); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java index 4626507c409..7c6f44e763a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java @@ -81,7 +81,6 @@ import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.net.Node; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.Credentials; @@ -225,7 +224,7 @@ public class NamenodeWebHdfsMethods { } return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology( - ).chooseRandom(NodeBase.ROOT); + ).chooseRandom(NodeBase.ROOT, excludes); } /** @@ -265,11 +264,11 @@ public class NamenodeWebHdfsMethods { final long blocksize, final String excludeDatanodes, final Param... parameters) throws URISyntaxException, IOException { final DatanodeInfo dn; - try { - dn = chooseDatanode(namenode, path, op, openOffset, blocksize, - excludeDatanodes); - } catch (InvalidTopologyException ite) { - throw new IOException("Failed to find datanode, suggest to check cluster health.", ite); + dn = chooseDatanode(namenode, path, op, openOffset, blocksize, + excludeDatanodes); + if (dn == null) { + throw new IOException("Failed to find datanode, suggest to check cluster" + + " health. excludeDatanodes=" + excludeDatanodes); } final String delegationQuery; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java index 736230c4bd1..7285d1b1df1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/net/TestNetworkTopology.java @@ -23,8 +23,11 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,13 +41,18 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; public class TestNetworkTopology { private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class); private final static NetworkTopology cluster = new NetworkTopology(); private DatanodeDescriptor dataNodes[]; - + + @Rule + public Timeout testTimeout = new Timeout(30000); + @Before public void setupDatanodes() { dataNodes = new DatanodeDescriptor[] { @@ -272,15 +280,17 @@ public class TestNetworkTopology { * @return the frequency that nodes were chosen */ private Map pickNodesAtRandom(int numNodes, - String excludedScope) { + String excludedScope, Collection excludedNodes) { Map frequency = new HashMap(); for (DatanodeDescriptor dnd : dataNodes) { frequency.put(dnd, 0); } for (int j = 0; j < numNodes; j++) { - Node random = cluster.chooseRandom(excludedScope); - frequency.put(random, frequency.get(random) + 1); + Node random = cluster.chooseRandom(excludedScope, excludedNodes); + if (random != null) { + frequency.put(random, frequency.get(random) + 1); + } } return frequency; } @@ -291,7 +301,7 @@ public class TestNetworkTopology { @Test public void testChooseRandomExcludedNode() { String scope = "~" + NodeBase.getPath(dataNodes[0]); - Map frequency = pickNodesAtRandom(100, scope); + Map frequency = pickNodesAtRandom(100, scope, null); for (Node key : dataNodes) { // all nodes except the first should be more than zero @@ -304,7 +314,7 @@ public class TestNetworkTopology { */ @Test public void testChooseRandomExcludedRack() { - Map frequency = pickNodesAtRandom(100, "~" + "/d2"); + Map frequency = pickNodesAtRandom(100, "~" + "/d2", null); // all the nodes on the second rack should be zero for (int j = 0; j < dataNodes.length; j++) { int freq = frequency.get(dataNodes[j]); @@ -316,6 +326,59 @@ public class TestNetworkTopology { } } + /** + * This test checks that chooseRandom works for a list of excluded nodes. + */ + @Test + public void testChooseRandomExcludedNodeList() { + String scope = "~" + NodeBase.getPath(dataNodes[0]); + Set excludedNodes = new HashSet<>(); + excludedNodes.add(dataNodes[3]); + excludedNodes.add(dataNodes[5]); + excludedNodes.add(dataNodes[7]); + excludedNodes.add(dataNodes[9]); + excludedNodes.add(dataNodes[13]); + excludedNodes.add(dataNodes[18]); + Map frequency = pickNodesAtRandom(100, scope, excludedNodes); + + assertEquals("dn[3] should be excluded", 0, + frequency.get(dataNodes[3]).intValue()); + assertEquals("dn[5] should be exclude18d", 0, + frequency.get(dataNodes[5]).intValue()); + assertEquals("dn[7] should be excluded", 0, + frequency.get(dataNodes[7]).intValue()); + assertEquals("dn[9] should be excluded", 0, + frequency.get(dataNodes[9]).intValue()); + assertEquals("dn[13] should be excluded", 0, + frequency.get(dataNodes[13]).intValue()); + assertEquals("dn[18] should be excluded", 0, + frequency.get(dataNodes[18]).intValue()); + for (Node key : dataNodes) { + if (excludedNodes.contains(key)) { + continue; + } + // all nodes except the first should be more than zero + assertTrue(frequency.get(key) > 0 || key == dataNodes[0]); + } + } + + /** + * This test checks that chooseRandom works when all nodes are excluded. + */ + @Test + public void testChooseRandomExcludeAllNodes() { + String scope = "~" + NodeBase.getPath(dataNodes[0]); + Set excludedNodes = new HashSet<>(); + for (int i = 0; i < dataNodes.length; i++) { + excludedNodes.add(dataNodes[i]); + } + Map frequency = pickNodesAtRandom(100, scope, excludedNodes); + for (Node key : dataNodes) { + // all nodes except the first should be more than zero + assertTrue(frequency.get(key) == 0); + } + } + @Test(timeout=180000) public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception { // start a cluster