HADOOP-17408. Optimize NetworkTopology sorting block locations. (#2601). Contributed by Ahmed Hussein and Daryn Sharp.

(cherry picked from commit 77435a025e)
This commit is contained in:
Ahmed Hussein 2021-01-08 13:10:09 -06:00 committed by Jim Brennan
parent cd5ee0014f
commit 18e2835766
2 changed files with 36 additions and 44 deletions

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.net;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -29,6 +28,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
@ -52,6 +53,8 @@ public class NetworkTopology {
private static final char PATH_SEPARATOR = '/';
private static final String PATH_SEPARATOR_STR = "/";
private static final String ROOT = "/";
private static final AtomicReference<Random> RANDOM_REF =
new AtomicReference<>();
public static class InvalidTopologyException extends RuntimeException {
private static final long serialVersionUID = 1L;
@ -396,17 +399,12 @@ public class NetworkTopology {
* @exception IllegalArgumentException when either node1 or node2 is null, or
* node1 or node2 do not belong to the cluster
*/
public boolean isOnSameRack( Node node1, Node node2) {
public boolean isOnSameRack(Node node1, Node node2) {
if (node1 == null || node2 == null) {
return false;
}
netlock.readLock().lock();
try {
return isSameParents(node1, node2);
} finally {
netlock.readLock().unlock();
}
return isSameParents(node1, node2);
}
/**
@ -440,11 +438,14 @@ public class NetworkTopology {
return node1.getParent()==node2.getParent();
}
private static final Random r = new Random();
@VisibleForTesting
void setRandomSeed(long seed) {
r.setSeed(seed);
RANDOM_REF.set(new Random(seed));
}
Random getRandom() {
Random random = RANDOM_REF.get();
return (random == null) ? ThreadLocalRandom.current() : random;
}
/**
@ -563,6 +564,7 @@ public class NetworkTopology {
totalInScopeNodes, availableNodes);
return null;
}
Random r = getRandom();
if (excludedNodes == null || excludedNodes.isEmpty()) {
// if there are no excludedNodes, randomly choose a node
final int index = r.nextInt(totalInScopeNodes);
@ -879,7 +881,7 @@ public class NetworkTopology {
* This method is called if the reader is a datanode,
* so nonDataNodeReader flag is set to false.
*/
sortByDistance(reader, nodes, activeLen, list -> Collections.shuffle(list));
sortByDistance(reader, nodes, activeLen, null);
}
/**
@ -922,8 +924,7 @@ public class NetworkTopology {
* This method is called if the reader is not a datanode,
* so nonDataNodeReader flag is set to true.
*/
sortByDistanceUsingNetworkLocation(reader, nodes, activeLen,
list -> Collections.shuffle(list));
sortByDistanceUsingNetworkLocation(reader, nodes, activeLen, null);
}
/**
@ -961,38 +962,28 @@ public class NetworkTopology {
int activeLen, Consumer<List<T>> secondarySort,
boolean nonDataNodeReader) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
for (int i=0; i<activeLen; i++) {
if(nonDataNodeReader) {
weights[i] = getWeightUsingNetworkLocation(reader, nodes[i]);
TreeMap<Integer, List<T>> weightedNodeTree =
new TreeMap<>();
int nWeight;
for (int i = 0; i < activeLen; i++) {
if (nonDataNodeReader) {
nWeight = getWeightUsingNetworkLocation(reader, nodes[i]);
} else {
weights[i] = getWeight(reader, nodes[i]);
nWeight = getWeight(reader, nodes[i]);
}
weightedNodeTree.computeIfAbsent(
nWeight, k -> new ArrayList<>(1)).add(nodes[i]);
}
// Add weight/node pairs to a TreeMap to sort
TreeMap<Integer, List<T>> tree = new TreeMap<>();
for (int i=0; i<activeLen; i++) {
int weight = weights[i];
T node = nodes[i];
List<T> list = tree.get(weight);
if (list == null) {
list = Lists.newArrayListWithExpectedSize(1);
tree.put(weight, list);
}
list.add(node);
}
// Sort nodes which have the same weight using secondarySort.
int idx = 0;
for (List<T> list: tree.values()) {
if (list != null) {
Collections.shuffle(list, r);
if (secondarySort != null) {
secondarySort.accept(list);
}
for (T n: list) {
nodes[idx] = n;
idx++;
}
// Sort nodes which have the same weight using secondarySort.
for (List<T> nodesList : weightedNodeTree.values()) {
Collections.shuffle(nodesList, getRandom());
if (secondarySort != null) {
// a secondary sort breaks the tie between nodes.
secondarySort.accept(nodesList);
}
for (T n : nodesList) {
nodes[idx++] = n;
}
}
Preconditions.checkState(idx == activeLen,

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -56,7 +57,7 @@ public class TestNetworkTopology {
private DatanodeDescriptor dataNodes[];
@Rule
public Timeout testTimeout = new Timeout(30000);
public Timeout testTimeout = new Timeout(30000, TimeUnit.MILLISECONDS);
@Before
public void setupDatanodes() {