HDFS-10320. Rack failures may result in NN terminate. (Xiao Chen via mingma)

This commit is contained in:
Ming Ma 2016-05-04 17:02:26 -07:00
parent 9e37fe3b7a
commit 1268cf5fbe
5 changed files with 196 additions and 96 deletions

View File

@ -29,13 +29,13 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -54,8 +54,8 @@ import com.google.common.collect.Lists;
public class NetworkTopology { public class NetworkTopology {
public final static String DEFAULT_RACK = "/default-rack"; public final static String DEFAULT_RACK = "/default-rack";
public final static int DEFAULT_HOST_LEVEL = 2; public final static int DEFAULT_HOST_LEVEL = 2;
public static final Log LOG = public static final Logger LOG =
LogFactory.getLog(NetworkTopology.class); LoggerFactory.getLogger(NetworkTopology.class);
public static class InvalidTopologyException extends RuntimeException { public static class InvalidTopologyException extends RuntimeException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@ -442,9 +442,7 @@ public class NetworkTopology {
} }
} }
} }
if(LOG.isDebugEnabled()) { LOG.debug("NetworkTopology became:\n{}", this.toString());
LOG.debug("NetworkTopology became:\n" + this.toString());
}
} finally { } finally {
netlock.writeLock().unlock(); netlock.writeLock().unlock();
} }
@ -517,9 +515,7 @@ public class NetworkTopology {
numOfRacks--; numOfRacks--;
} }
} }
if(LOG.isDebugEnabled()) { LOG.debug("NetworkTopology became:\n{}", this.toString());
LOG.debug("NetworkTopology became:\n" + this.toString());
}
} finally { } finally {
netlock.writeLock().unlock(); netlock.writeLock().unlock();
} }
@ -717,26 +713,45 @@ public class NetworkTopology {
r.setSeed(seed); r.setSeed(seed);
} }
/** randomly choose one node from <i>scope</i> /**
* if scope starts with ~, choose one from the all nodes except for the * Randomly choose a node.
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i> *
* @param scope range of nodes from which a node will be chosen * @param scope range of nodes from which a node will be chosen
* @return the chosen node * @return the chosen node
*
* @see #chooseRandom(String, Collection)
*/ */
public Node chooseRandom(String scope) { public Node chooseRandom(final String scope) {
return chooseRandom(scope, null);
}
/**
* Randomly choose one node from <i>scope</i>.
*
* If scope starts with ~, choose one from the all nodes except for the
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i>.
* If excludedNodes is given, choose a node that's not in excludedNodes.
*
* @param scope range of nodes from which a node will be chosen
* @param excludedNodes nodes to be excluded from
* @return the chosen node
*/
public Node chooseRandom(final String scope,
final Collection<Node> excludedNodes) {
netlock.readLock().lock(); netlock.readLock().lock();
try { try {
if (scope.startsWith("~")) { if (scope.startsWith("~")) {
return chooseRandom(NodeBase.ROOT, scope.substring(1)); return chooseRandom(NodeBase.ROOT, scope.substring(1), excludedNodes);
} else { } else {
return chooseRandom(scope, null); return chooseRandom(scope, null, excludedNodes);
} }
} finally { } finally {
netlock.readLock().unlock(); netlock.readLock().unlock();
} }
} }
private Node chooseRandom(String scope, String excludedScope){ private Node chooseRandom(final String scope, String excludedScope,
final Collection<Node> excludedNodes) {
if (excludedScope != null) { if (excludedScope != null) {
if (scope.startsWith(excludedScope)) { if (scope.startsWith(excludedScope)) {
return null; return null;
@ -747,7 +762,8 @@ public class NetworkTopology {
} }
Node node = getNode(scope); Node node = getNode(scope);
if (!(node instanceof InnerNode)) { if (!(node instanceof InnerNode)) {
return node; return excludedNodes != null && excludedNodes.contains(node) ?
null : node;
} }
InnerNode innerNode = (InnerNode)node; InnerNode innerNode = (InnerNode)node;
int numOfDatanodes = innerNode.getNumOfLeaves(); int numOfDatanodes = innerNode.getNumOfLeaves();
@ -762,12 +778,36 @@ public class NetworkTopology {
} }
} }
if (numOfDatanodes == 0) { if (numOfDatanodes == 0) {
throw new InvalidTopologyException( LOG.warn("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\").",
"Failed to find datanode (scope=\"" + String.valueOf(scope) + String.valueOf(scope), String.valueOf(excludedScope));
"\" excludedScope=\"" + String.valueOf(excludedScope) + "\")."); return null;
} }
Node ret = null;
final int availableNodes;
if (excludedScope == null) {
availableNodes = countNumOfAvailableNodes(scope, excludedNodes);
} else {
availableNodes =
countNumOfAvailableNodes("~" + excludedScope, excludedNodes);
}
LOG.debug("Choosing random from {} available nodes on node {},"
+ " scope={}, excludedScope={}, excludeNodes={}", availableNodes,
innerNode.toString(), scope, excludedScope, excludedNodes);
if (availableNodes > 0) {
do {
int leaveIndex = r.nextInt(numOfDatanodes); int leaveIndex = r.nextInt(numOfDatanodes);
return innerNode.getLeaf(leaveIndex, node); ret = innerNode.getLeaf(leaveIndex, node);
if (excludedNodes == null || !excludedNodes.contains(ret)) {
break;
} else {
LOG.debug("Node {} is excluded, continuing.", ret);
}
// We've counted numOfAvailableNodes inside the lock, so there must be
// at least 1 satisfying node. Keep trying until we found it.
} while (true);
}
LOG.debug("chooseRandom returning {}", ret);
return ret;
} }
/** return leaves in <i>scope</i> /** return leaves in <i>scope</i>
@ -795,6 +835,7 @@ public class NetworkTopology {
* @param excludedNodes a list of nodes * @param excludedNodes a list of nodes
* @return number of available nodes * @return number of available nodes
*/ */
@VisibleForTesting
public int countNumOfAvailableNodes(String scope, public int countNumOfAvailableNodes(String scope,
Collection<Node> excludedNodes) { Collection<Node> excludedNodes) {
boolean isExcluded=false; boolean isExcluded=false;
@ -807,6 +848,7 @@ public class NetworkTopology {
int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes int excludedCountOffScope = 0; // the number of nodes outside scope & excludedNodes
netlock.readLock().lock(); netlock.readLock().lock();
try { try {
if (excludedNodes != null) {
for (Node node : excludedNodes) { for (Node node : excludedNodes) {
node = getNode(NodeBase.getPath(node)); node = getNode(NodeBase.getPath(node));
if (node == null) { if (node == null) {
@ -819,6 +861,7 @@ public class NetworkTopology {
excludedCountOffScope++; excludedCountOffScope++;
} }
} }
}
Node n = getNode(scope); Node n = getNode(scope);
int scopeNodeCount = 0; int scopeNodeCount = 0;
if (n != null) { if (n != null) {

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
import java.util.Collection;
import java.util.Random; import java.util.Random;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -28,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
/** /**
* Space balanced block placement policy. * Space balanced block placement policy.
@ -68,9 +70,12 @@ public class AvailableSpaceBlockPlacementPolicy extends
} }
@Override @Override
protected DatanodeDescriptor chooseDataNode(String scope) { protected DatanodeDescriptor chooseDataNode(final String scope,
DatanodeDescriptor a = (DatanodeDescriptor) clusterMap.chooseRandom(scope); final Collection<Node> excludedNode) {
DatanodeDescriptor b = (DatanodeDescriptor) clusterMap.chooseRandom(scope); DatanodeDescriptor a =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
DatanodeDescriptor b =
(DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
int ret = compareDataNode(a, b); int ret = compareDataNode(a, b);
if (ret == 0) { if (ret == 0) {
return a; return a;

View File

@ -21,6 +21,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.*; import java.util.*;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.AddBlockFlag; import org.apache.hadoop.hdfs.AddBlockFlag;
@ -643,10 +644,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
/** /**
* Choose <i>numOfReplicas</i> nodes from the racks * Choose <i>numOfReplicas</i> nodes from the racks
* that <i>localMachine</i> is NOT on. * that <i>localMachine</i> is NOT on.
* if not enough nodes are available, choose the remaining ones * If not enough nodes are available, choose the remaining ones
* from the local rack * from the local rack
*/ */
protected void chooseRemoteRack(int numOfReplicas, protected void chooseRemoteRack(int numOfReplicas,
DatanodeDescriptor localMachine, DatanodeDescriptor localMachine,
Set<Node> excludedNodes, Set<Node> excludedNodes,
@ -702,10 +702,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
boolean avoidStaleNodes, boolean avoidStaleNodes,
EnumMap<StorageType, Integer> storageTypes) EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException { throws NotEnoughReplicasException {
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(
scope, excludedNodes);
int refreshCounter = numOfAvailableNodes;
StringBuilder builder = null; StringBuilder builder = null;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
builder = debugLoggingBuilder.get(); builder = debugLoggingBuilder.get();
@ -714,18 +710,22 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
} }
boolean badTarget = false; boolean badTarget = false;
DatanodeStorageInfo firstChosen = null; DatanodeStorageInfo firstChosen = null;
while(numOfReplicas > 0 && numOfAvailableNodes > 0) { while (numOfReplicas > 0) {
DatanodeDescriptor chosenNode = chooseDataNode(scope); DatanodeDescriptor chosenNode = chooseDataNode(scope, excludedNodes);
if (excludedNodes.add(chosenNode)) { //was not in the excluded list if (chosenNode == null) {
if (LOG.isDebugEnabled()) { break;
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); }
Preconditions.checkState(excludedNodes.add(chosenNode), "chosenNode "
+ chosenNode + " is already in excludedNodes " + excludedNodes);
if (LOG.isDebugEnabled()) {
builder.append("\nNode ").append(NodeBase.getPath(chosenNode))
.append(" [");
} }
numOfAvailableNodes--;
DatanodeStorageInfo storage = null; DatanodeStorageInfo storage = null;
if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad,
results, avoidStaleNodes)) { results, avoidStaleNodes)) {
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
.entrySet().iterator(); iter.hasNext(); ) { .entrySet().iterator(); iter.hasNext();) {
Map.Entry<StorageType, Integer> entry = iter.next(); Map.Entry<StorageType, Integer> entry = iter.next();
storage = chooseStorage4Block( storage = chooseStorage4Block(
chosenNode, blocksize, results, entry.getKey()); chosenNode, blocksize, results, entry.getKey());
@ -734,8 +734,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
if (firstChosen == null) { if (firstChosen == null) {
firstChosen = storage; firstChosen = storage;
} }
// add node and related nodes to excludedNode // add node (subclasses may also add related nodes) to excludedNode
numOfAvailableNodes -=
addToExcludedNodes(chosenNode, excludedNodes); addToExcludedNodes(chosenNode, excludedNodes);
int num = entry.getValue(); int num = entry.getValue();
if (num == 1) { if (num == 1) {
@ -746,7 +745,6 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
break; break;
} }
} }
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
builder.append("\n]"); builder.append("\n]");
@ -755,16 +753,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
// If no candidate storage was found on this DN then set badTarget. // If no candidate storage was found on this DN then set badTarget.
badTarget = (storage == null); badTarget = (storage == null);
} }
// Refresh the node count. If the live node count became smaller,
// but it is not reflected in this loop, it may loop forever in case
// the replicas/rack cannot be satisfied.
if (--refreshCounter == 0) {
numOfAvailableNodes = clusterMap.countNumOfAvailableNodes(scope,
excludedNodes);
refreshCounter = numOfAvailableNodes;
} }
}
if (numOfReplicas>0) { if (numOfReplicas>0) {
String detail = enableDebugLogging; String detail = enableDebugLogging;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -785,8 +774,9 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
* Choose a datanode from the given <i>scope</i>. * Choose a datanode from the given <i>scope</i>.
* @return the chosen node, if there is any. * @return the chosen node, if there is any.
*/ */
protected DatanodeDescriptor chooseDataNode(final String scope) { protected DatanodeDescriptor chooseDataNode(final String scope,
return (DatanodeDescriptor) clusterMap.chooseRandom(scope); final Collection<Node> excludedNodes) {
return (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNodes);
} }
/** /**

View File

@ -81,7 +81,6 @@ import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -225,7 +224,7 @@ public class NamenodeWebHdfsMethods {
} }
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology( return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
).chooseRandom(NodeBase.ROOT); ).chooseRandom(NodeBase.ROOT, excludes);
} }
/** /**
@ -265,11 +264,11 @@ public class NamenodeWebHdfsMethods {
final long blocksize, final String excludeDatanodes, final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn; final DatanodeInfo dn;
try {
dn = chooseDatanode(namenode, path, op, openOffset, blocksize, dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes); excludeDatanodes);
} catch (InvalidTopologyException ite) { if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite); throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
} }
final String delegationQuery; final String delegationQuery;

View File

@ -23,8 +23,11 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -38,13 +41,18 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.Timeout;
public class TestNetworkTopology { public class TestNetworkTopology {
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class); private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
private final static NetworkTopology cluster = new NetworkTopology(); private final static NetworkTopology cluster = new NetworkTopology();
private DatanodeDescriptor dataNodes[]; private DatanodeDescriptor dataNodes[];
@Rule
public Timeout testTimeout = new Timeout(30000);
@Before @Before
public void setupDatanodes() { public void setupDatanodes() {
dataNodes = new DatanodeDescriptor[] { dataNodes = new DatanodeDescriptor[] {
@ -272,16 +280,18 @@ public class TestNetworkTopology {
* @return the frequency that nodes were chosen * @return the frequency that nodes were chosen
*/ */
private Map<Node, Integer> pickNodesAtRandom(int numNodes, private Map<Node, Integer> pickNodesAtRandom(int numNodes,
String excludedScope) { String excludedScope, Collection<Node> excludedNodes) {
Map<Node, Integer> frequency = new HashMap<Node, Integer>(); Map<Node, Integer> frequency = new HashMap<Node, Integer>();
for (DatanodeDescriptor dnd : dataNodes) { for (DatanodeDescriptor dnd : dataNodes) {
frequency.put(dnd, 0); frequency.put(dnd, 0);
} }
for (int j = 0; j < numNodes; j++) { for (int j = 0; j < numNodes; j++) {
Node random = cluster.chooseRandom(excludedScope); Node random = cluster.chooseRandom(excludedScope, excludedNodes);
if (random != null) {
frequency.put(random, frequency.get(random) + 1); frequency.put(random, frequency.get(random) + 1);
} }
}
return frequency; return frequency;
} }
@ -291,7 +301,7 @@ public class TestNetworkTopology {
@Test @Test
public void testChooseRandomExcludedNode() { public void testChooseRandomExcludedNode() {
String scope = "~" + NodeBase.getPath(dataNodes[0]); String scope = "~" + NodeBase.getPath(dataNodes[0]);
Map<Node, Integer> frequency = pickNodesAtRandom(100, scope); Map<Node, Integer> frequency = pickNodesAtRandom(100, scope, null);
for (Node key : dataNodes) { for (Node key : dataNodes) {
// all nodes except the first should be more than zero // all nodes except the first should be more than zero
@ -304,7 +314,7 @@ public class TestNetworkTopology {
*/ */
@Test @Test
public void testChooseRandomExcludedRack() { public void testChooseRandomExcludedRack() {
Map<Node, Integer> frequency = pickNodesAtRandom(100, "~" + "/d2"); Map<Node, Integer> frequency = pickNodesAtRandom(100, "~" + "/d2", null);
// all the nodes on the second rack should be zero // all the nodes on the second rack should be zero
for (int j = 0; j < dataNodes.length; j++) { for (int j = 0; j < dataNodes.length; j++) {
int freq = frequency.get(dataNodes[j]); int freq = frequency.get(dataNodes[j]);
@ -316,6 +326,59 @@ public class TestNetworkTopology {
} }
} }
/**
* This test checks that chooseRandom works for a list of excluded nodes.
*/
@Test
public void testChooseRandomExcludedNodeList() {
String scope = "~" + NodeBase.getPath(dataNodes[0]);
Set<Node> excludedNodes = new HashSet<>();
excludedNodes.add(dataNodes[3]);
excludedNodes.add(dataNodes[5]);
excludedNodes.add(dataNodes[7]);
excludedNodes.add(dataNodes[9]);
excludedNodes.add(dataNodes[13]);
excludedNodes.add(dataNodes[18]);
Map<Node, Integer> frequency = pickNodesAtRandom(100, scope, excludedNodes);
assertEquals("dn[3] should be excluded", 0,
frequency.get(dataNodes[3]).intValue());
assertEquals("dn[5] should be exclude18d", 0,
frequency.get(dataNodes[5]).intValue());
assertEquals("dn[7] should be excluded", 0,
frequency.get(dataNodes[7]).intValue());
assertEquals("dn[9] should be excluded", 0,
frequency.get(dataNodes[9]).intValue());
assertEquals("dn[13] should be excluded", 0,
frequency.get(dataNodes[13]).intValue());
assertEquals("dn[18] should be excluded", 0,
frequency.get(dataNodes[18]).intValue());
for (Node key : dataNodes) {
if (excludedNodes.contains(key)) {
continue;
}
// all nodes except the first should be more than zero
assertTrue(frequency.get(key) > 0 || key == dataNodes[0]);
}
}
/**
* This test checks that chooseRandom works when all nodes are excluded.
*/
@Test
public void testChooseRandomExcludeAllNodes() {
String scope = "~" + NodeBase.getPath(dataNodes[0]);
Set<Node> excludedNodes = new HashSet<>();
for (int i = 0; i < dataNodes.length; i++) {
excludedNodes.add(dataNodes[i]);
}
Map<Node, Integer> frequency = pickNodesAtRandom(100, scope, excludedNodes);
for (Node key : dataNodes) {
// all nodes except the first should be more than zero
assertTrue(frequency.get(key) == 0);
}
}
@Test(timeout=180000) @Test(timeout=180000)
public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception { public void testInvalidNetworkTopologiesNotCachedInHdfs() throws Exception {
// start a cluster // start a cluster