HADOOP-15317. Improve NetworkTopology chooseRandom's loop.
This commit is contained in:
parent
c78cb18c61
commit
57374c4737
|
@ -522,12 +522,12 @@ public class NetworkTopology {
|
||||||
numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
|
numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (numOfDatanodes == 0) {
|
if (numOfDatanodes <= 0) {
|
||||||
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\").",
|
LOG.debug("Failed to find datanode (scope=\"{}\" excludedScope=\"{}\")."
|
||||||
scope, excludedScope);
|
+ " numOfDatanodes={}",
|
||||||
|
scope, excludedScope, numOfDatanodes);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
Node ret = null;
|
|
||||||
final int availableNodes;
|
final int availableNodes;
|
||||||
if (excludedScope == null) {
|
if (excludedScope == null) {
|
||||||
availableNodes = countNumOfAvailableNodes(scope, excludedNodes);
|
availableNodes = countNumOfAvailableNodes(scope, excludedNodes);
|
||||||
|
@ -536,25 +536,99 @@ public class NetworkTopology {
|
||||||
countNumOfAvailableNodes("~" + excludedScope, excludedNodes);
|
countNumOfAvailableNodes("~" + excludedScope, excludedNodes);
|
||||||
}
|
}
|
||||||
LOG.debug("Choosing random from {} available nodes on node {},"
|
LOG.debug("Choosing random from {} available nodes on node {},"
|
||||||
+ " scope={}, excludedScope={}, excludeNodes={}", availableNodes,
|
+ " scope={}, excludedScope={}, excludeNodes={}. numOfDatanodes={}.",
|
||||||
innerNode, scope, excludedScope, excludedNodes);
|
availableNodes, innerNode, scope, excludedScope, excludedNodes,
|
||||||
|
numOfDatanodes);
|
||||||
|
Node ret = null;
|
||||||
if (availableNodes > 0) {
|
if (availableNodes > 0) {
|
||||||
do {
|
ret = chooseRandom(innerNode, node, excludedNodes, numOfDatanodes,
|
||||||
int leaveIndex = r.nextInt(numOfDatanodes);
|
availableNodes);
|
||||||
ret = innerNode.getLeaf(leaveIndex, node);
|
|
||||||
if (excludedNodes == null || !excludedNodes.contains(ret)) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
LOG.debug("Node {} is excluded, continuing.", ret);
|
|
||||||
}
|
|
||||||
// We've counted numOfAvailableNodes inside the lock, so there must be
|
|
||||||
// at least 1 satisfying node. Keep trying until we found it.
|
|
||||||
} while (true);
|
|
||||||
}
|
}
|
||||||
LOG.debug("chooseRandom returning {}", ret);
|
LOG.debug("chooseRandom returning {}", ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Randomly choose one node under <i>parentNode</i>, considering the exclude
|
||||||
|
* nodes and scope. Should be called with {@link #netlock}'s readlock held.
|
||||||
|
*
|
||||||
|
* @param parentNode the parent node
|
||||||
|
* @param excludedScopeNode the node corresponding to the exclude scope.
|
||||||
|
* @param excludedNodes a collection of nodes to be excluded from
|
||||||
|
* @param totalInScopeNodes total number of nodes under parentNode, excluding
|
||||||
|
* the excludedScopeNode
|
||||||
|
* @param availableNodes number of available nodes under parentNode that
|
||||||
|
* could be chosen, excluding excludedNodes
|
||||||
|
* @return the chosen node, or null if none can be chosen
|
||||||
|
*/
|
||||||
|
private Node chooseRandom(final InnerNode parentNode,
|
||||||
|
final Node excludedScopeNode, final Collection<Node> excludedNodes,
|
||||||
|
final int totalInScopeNodes, final int availableNodes) {
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
totalInScopeNodes >= availableNodes && availableNodes > 0, String
|
||||||
|
.format("%d should >= %d, and both should be positive.",
|
||||||
|
totalInScopeNodes, availableNodes));
|
||||||
|
if (excludedNodes == null || excludedNodes.isEmpty()) {
|
||||||
|
// if there are no excludedNodes, randomly choose a node
|
||||||
|
final int index = r.nextInt(totalInScopeNodes);
|
||||||
|
return parentNode.getLeaf(index, excludedScopeNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
// excludedNodes non empty.
|
||||||
|
// Choose the nth VALID node, where n is random. VALID meaning it can be
|
||||||
|
// returned, after considering exclude scope and exclude nodes.
|
||||||
|
// The probability of being chosen should be equal for all VALID nodes.
|
||||||
|
// Notably, we do NOT choose nth node, and find the next valid node
|
||||||
|
// if n is excluded - this will make the probability of the node immediately
|
||||||
|
// after an excluded node higher.
|
||||||
|
//
|
||||||
|
// Start point is always 0 and that's fine, because the nth valid node
|
||||||
|
// logic provides equal randomness.
|
||||||
|
//
|
||||||
|
// Consider this example, where 1,3,5 out of the 10 nodes are excluded:
|
||||||
|
// 1 2 3 4 5 6 7 8 9 10
|
||||||
|
// x x x
|
||||||
|
// We will randomly choose the nth valid node where n is [0,6].
|
||||||
|
// We do NOT choose a random number n and just use the closest valid node,
|
||||||
|
// for example both n=3 and n=4 will choose 4, making it a 2/10 probability,
|
||||||
|
// higher than the expected 1/7
|
||||||
|
// totalInScopeNodes=10 and availableNodes=7 in this example.
|
||||||
|
int nthValidToReturn = r.nextInt(availableNodes);
|
||||||
|
LOG.debug("nthValidToReturn is {}", nthValidToReturn);
|
||||||
|
Node ret =
|
||||||
|
parentNode.getLeaf(r.nextInt(totalInScopeNodes), excludedScopeNode);
|
||||||
|
if (!excludedNodes.contains(ret)) {
|
||||||
|
// return if we're lucky enough to get a valid node at a random first pick
|
||||||
|
LOG.debug("Chosen node {} from first random", ret);
|
||||||
|
return ret;
|
||||||
|
} else {
|
||||||
|
ret = null;
|
||||||
|
}
|
||||||
|
Node lastValidNode = null;
|
||||||
|
for (int i = 0; i < totalInScopeNodes; ++i) {
|
||||||
|
ret = parentNode.getLeaf(i, excludedScopeNode);
|
||||||
|
if (!excludedNodes.contains(ret)) {
|
||||||
|
if (nthValidToReturn == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
--nthValidToReturn;
|
||||||
|
lastValidNode = ret;
|
||||||
|
} else {
|
||||||
|
LOG.debug("Node {} is excluded, continuing.", ret);
|
||||||
|
ret = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (ret == null && lastValidNode != null) {
|
||||||
|
LOG.error("BUG: Found lastValidNode {} but not nth valid node. "
|
||||||
|
+ "parentNode={}, excludedScopeNode={}, excludedNodes={}, "
|
||||||
|
+ "totalInScopeNodes={}, availableNodes={}, nthValidToReturn={}.",
|
||||||
|
lastValidNode, parentNode, excludedScopeNode, excludedNodes,
|
||||||
|
totalInScopeNodes, availableNodes, nthValidToReturn);
|
||||||
|
ret = lastValidNode;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/** return leaves in <i>scope</i>
|
/** return leaves in <i>scope</i>
|
||||||
* @param scope a path string
|
* @param scope a path string
|
||||||
* @return leaves nodes under specific scope
|
* @return leaves nodes under specific scope
|
||||||
|
|
|
@ -27,10 +27,9 @@ import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
@ -39,14 +38,19 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
|
||||||
public class TestNetworkTopology {
|
public class TestNetworkTopology {
|
||||||
private static final Log LOG = LogFactory.getLog(TestNetworkTopology.class);
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestNetworkTopology.class);
|
||||||
private final static NetworkTopology cluster =
|
private final static NetworkTopology cluster =
|
||||||
NetworkTopology.getInstance(new Configuration());
|
NetworkTopology.getInstance(new Configuration());
|
||||||
private DatanodeDescriptor dataNodes[];
|
private DatanodeDescriptor dataNodes[];
|
||||||
|
@ -83,6 +87,7 @@ public class TestNetworkTopology {
|
||||||
}
|
}
|
||||||
dataNodes[9].setDecommissioned();
|
dataNodes[9].setDecommissioned();
|
||||||
dataNodes[10].setDecommissioned();
|
dataNodes[10].setDecommissioned();
|
||||||
|
GenericTestUtils.setLogLevel(NetworkTopology.LOG, Level.TRACE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -324,6 +329,7 @@ public class TestNetworkTopology {
|
||||||
frequency.put(random, frequency.get(random) + 1);
|
frequency.put(random, frequency.get(random) + 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LOG.info("Result:" + frequency);
|
||||||
return frequency;
|
return frequency;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -471,4 +477,67 @@ public class TestNetworkTopology {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests chooseRandom with include scope, excluding a few nodes.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseRandomInclude1() {
|
||||||
|
final String scope = "/d1";
|
||||||
|
final Set<Node> excludedNodes = new HashSet<>();
|
||||||
|
final Random r = new Random();
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
final int index = r.nextInt(5);
|
||||||
|
excludedNodes.add(dataNodes[index]);
|
||||||
|
}
|
||||||
|
Map<Node, Integer> frequency = pickNodesAtRandom(100, scope, excludedNodes);
|
||||||
|
|
||||||
|
verifyResults(5, excludedNodes, frequency);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests chooseRandom with include scope at rack, excluding a node.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseRandomInclude2() {
|
||||||
|
String scope = dataNodes[0].getNetworkLocation();
|
||||||
|
Set<Node> excludedNodes = new HashSet<>();
|
||||||
|
final Random r = new Random();
|
||||||
|
int index = r.nextInt(1);
|
||||||
|
excludedNodes.add(dataNodes[index]);
|
||||||
|
final int count = 100;
|
||||||
|
Map<Node, Integer> frequency =
|
||||||
|
pickNodesAtRandom(count, scope, excludedNodes);
|
||||||
|
|
||||||
|
verifyResults(1, excludedNodes, frequency);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyResults(int upperbound, Set<Node> excludedNodes,
|
||||||
|
Map<Node, Integer> frequency) {
|
||||||
|
LOG.info("Excluded nodes are: {}", excludedNodes);
|
||||||
|
for (int i = 0; i < upperbound; ++i) {
|
||||||
|
final Node n = dataNodes[i];
|
||||||
|
LOG.info("Verifying node {}", n);
|
||||||
|
if (excludedNodes.contains(n)) {
|
||||||
|
assertEquals(n + " should not have been chosen.", 0,
|
||||||
|
(int) frequency.get(n));
|
||||||
|
} else {
|
||||||
|
assertTrue(n + " should have been chosen", frequency.get(n) > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests chooseRandom with include scope, no exlucde nodes.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseRandomInclude3() {
|
||||||
|
String scope = "/d1";
|
||||||
|
Map<Node, Integer> frequency = pickNodesAtRandom(200, scope, null);
|
||||||
|
LOG.info("No node is excluded.");
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
// all nodes should be more than zero
|
||||||
|
assertTrue(dataNodes[i] + " should have been chosen.",
|
||||||
|
frequency.get(dataNodes[i]) > 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue