From 45847e8df1b447ddbc1b683b817a095f2f969190 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 4 Jun 2014 01:08:02 +0000 Subject: [PATCH] Addendum for common changes in HDFS-6268 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1599846 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/hadoop/net/NetworkTopology.java | 147 +++++++++++------- .../net/NetworkTopologyWithNodeGroup.java | 105 ++++--------- .../net/TestNetworkTopologyWithNodeGroup.java | 10 +- 3 files changed, 131 insertions(+), 131 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java index daf7dc735e4..9ba0b43d985 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java @@ -20,8 +20,10 @@ package org.apache.hadoop.net; import java.util.ArrayList; import java.util.List; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.TreeMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -33,6 +35,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.util.ReflectionUtils; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + /** The class represents a cluster of computer with a tree hierarchical * network topology. * For example, a cluster may be consists of many data centers filled @@ -668,7 +673,23 @@ public class NetworkTopology { return node1.getParent()==node2.getParent(); } - final protected static Random r = new Random(); + private static final ThreadLocal r = new ThreadLocal(); + + /** + * Getter for thread-local Random, which provides better performance than + * a shared Random (even though Random is thread-safe). + * + * @return Thread-local Random. + */ + protected Random getRandom() { + Random rand = r.get(); + if (rand == null) { + rand = new Random(); + r.set(rand); + } + return rand; + } + /** randomly choose one node from scope * if scope starts with ~, choose one from the all nodes except for the * ones in scope; otherwise, choose one from scope @@ -718,7 +739,7 @@ public class NetworkTopology { "Failed to find datanode (scope=\"" + String.valueOf(scope) + "\" excludedScope=\"" + String.valueOf(excludedScope) + "\")."); } - int leaveIndex = r.nextInt(numOfDatanodes); + int leaveIndex = getRandom().nextInt(numOfDatanodes); return innerNode.getLeaf(leaveIndex, node); } @@ -825,61 +846,79 @@ public class NetworkTopology { return networkLocation.substring(index); } - /** swap two array items */ - static protected void swap(Node[] nodes, int i, int j) { - Node tempNode; - tempNode = nodes[j]; - nodes[j] = nodes[i]; - nodes[i] = tempNode; - } - - /** Sort nodes array by their distances to reader - * It linearly scans the array, if a local node is found, swap it with - * the first element of the array. - * If a local rack node is found, swap it with the first element following - * the local node. - * If neither local node or local rack node is found, put a random replica - * location at position 0. - * It leaves the rest nodes untouched. - * @param reader the node that wishes to read a block from one of the nodes - * @param nodes the list of nodes containing data for the reader + /** + * Returns an integer weight which specifies how far away {node} is away from + * {reader}. A lower value signifies that a node is closer. + * + * @param reader Node where data will be read + * @param node Replica of data + * @return weight */ - public void pseudoSortByDistance( Node reader, Node[] nodes ) { - int tempIndex = 0; - int localRackNode = -1; - if (reader != null ) { - //scan the array to find the local node & local rack node - for(int i=0; ireader. + *

+ * In a three-level topology, a node can be either local, on the same rack, or + * on a different rack from the reader. Sorting the nodes based on network + * distance from the reader reduces network traffic and improves performance. + *

+ * As an additional twist, we also randomize the nodes at each network + * distance using the provided random seed. This helps with load balancing + * when there is data skew. + * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param seed Used to seed the pseudo-random generator that randomizes the + * set of nodes at each network distance. + */ + public void sortByDistance(Node reader, Node[] nodes, long seed) { + /** Sort weights for the nodes array */ + int[] weights = new int[nodes.length]; + for (int i=0; i> tree = new TreeMap>(); + for (int i=0; i list = tree.get(weight); + if (list == null) { + list = Lists.newArrayListWithExpectedSize(1); + tree.put(weight, list); + } + list.add(node); + } + + // Seed is normally the block id + // This means we use the same pseudo-random order for each block, for + // potentially better page cache usage. + Random rand = getRandom(); + rand.setSeed(seed); + int idx = 0; + for (List list: tree.values()) { + if (list != null) { + Collections.shuffle(list, rand); + for (Node n: list) { + nodes[idx] = n; + idx++; } } - - // swap the local rack node and the node at position tempIndex - if(localRackNode != -1 && localRackNode != tempIndex ) { - swap(nodes, tempIndex, localRackNode); - tempIndex++; - } - } - - // put a random node at position 0 if it is not a local/local-rack node - if(tempIndex == 0 && localRackNode == -1 && nodes.length != 0) { - swap(nodes, 0, r.nextInt(nodes.length)); } + Preconditions.checkState(idx == nodes.length, + "Sorted the wrong number of nodes!"); } - } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java index 86920f08b83..975fe4d1e31 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopologyWithNodeGroup.java @@ -248,25 +248,41 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { } } - /** Sort nodes array by their distances to reader - * It linearly scans the array, if a local node is found, swap it with - * the first element of the array. - * If a local node group node is found, swap it with the first element - * following the local node. - * If a local rack node is found, swap it with the first element following - * the local node group node. - * If neither local node, node group node or local rack node is found, put a - * random replica location at position 0. - * It leaves the rest nodes untouched. - * @param reader the node that wishes to read a block from one of the nodes - * @param nodes the list of nodes containing data for the reader + @Override + protected int getWeight(Node reader, Node node) { + // 0 is local, 1 is same node group, 2 is same rack, 3 is off rack + // Start off by initializing to off rack + int weight = 3; + if (reader != null) { + if (reader == node) { + weight = 0; + } else if (isOnSameNodeGroup(reader, node)) { + weight = 1; + } else if (isOnSameRack(reader, node)) { + weight = 2; + } + } + return weight; + } + + /** + * Sort nodes array by their distances to reader. + *

+ * This is the same as + * {@link NetworkTopology#sortByDistance(Node, Node[], long)} except with a + * four-level network topology which contains the additional network distance + * of a "node group" which is between local and same rack. + * + * @param reader Node where data will be read + * @param nodes Available replicas with the requested data + * @param seed Used to seed the pseudo-random generator that randomizes the + * set of nodes at each network distance. */ @Override - public void pseudoSortByDistance( Node reader, Node[] nodes ) { - + public void sortByDistance( Node reader, Node[] nodes, long seed) { + // If reader is not a datanode (not in NetworkTopology tree), we need to + // replace this reader with a sibling leaf node in tree. if (reader != null && !this.contains(reader)) { - // if reader is not a datanode (not in NetworkTopology tree), we will - // replace this reader with a sibling leaf node in tree. Node nodeGroup = getNode(reader.getNetworkLocation()); if (nodeGroup != null && nodeGroup instanceof InnerNode) { InnerNode parentNode = (InnerNode) nodeGroup; @@ -276,62 +292,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology { return; } } - int tempIndex = 0; - int localRackNode = -1; - int localNodeGroupNode = -1; - if (reader != null) { - //scan the array to find the local node & local rack node - for (int i = 0; i < nodes.length; i++) { - if (tempIndex == 0 && reader == nodes[i]) { //local node - //swap the local node and the node at position 0 - if (i != 0) { - swap(nodes, tempIndex, i); - } - tempIndex=1; - - if (localRackNode != -1 && (localNodeGroupNode !=-1)) { - if (localRackNode == 0) { - localRackNode = i; - } - if (localNodeGroupNode == 0) { - localNodeGroupNode = i; - } - break; - } - } else if (localNodeGroupNode == -1 && isOnSameNodeGroup(reader, - nodes[i])) { - //local node group - localNodeGroupNode = i; - // node local and rack local are already found - if(tempIndex != 0 && localRackNode != -1) break; - } else if (localRackNode == -1 && isOnSameRack(reader, nodes[i])) { - localRackNode = i; - if (tempIndex != 0 && localNodeGroupNode != -1) break; - } - } - - // swap the local nodegroup node and the node at position tempIndex - if(localNodeGroupNode != -1 && localNodeGroupNode != tempIndex) { - swap(nodes, tempIndex, localNodeGroupNode); - if (localRackNode == tempIndex) { - localRackNode = localNodeGroupNode; - } - tempIndex++; - } - - // swap the local rack node and the node at position tempIndex - if(localRackNode != -1 && localRackNode != tempIndex) { - swap(nodes, tempIndex, localRackNode); - tempIndex++; - } - } - - // put a random node at position 0 if there is not a local/local-nodegroup/ - // local-rack node - if (tempIndex == 0 && localNodeGroupNode == -1 && localRackNode == -1 - && nodes.length != 0) { - swap(nodes, 0, r.nextInt(nodes.length)); - } + super.sortByDistance(reader, nodes, seed); } /** InnerNodeWithNodeGroup represents a switch/router of a data center, rack diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java index 5fa2e14748b..c082cce14a4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetworkTopologyWithNodeGroup.java @@ -96,7 +96,7 @@ public class TestNetworkTopologyWithNodeGroup { } @Test - public void testPseudoSortByDistance() throws Exception { + public void testSortByDistance() throws Exception { NodeBase[] testNodes = new NodeBase[4]; // array contains both local node, local node group & local rack node @@ -104,7 +104,7 @@ public class TestNetworkTopologyWithNodeGroup { testNodes[1] = dataNodes[2]; testNodes[2] = dataNodes[3]; testNodes[3] = dataNodes[0]; - cluster.pseudoSortByDistance(dataNodes[0], testNodes ); + cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); assertTrue(testNodes[2] == dataNodes[2]); @@ -115,7 +115,7 @@ public class TestNetworkTopologyWithNodeGroup { testNodes[1] = dataNodes[4]; testNodes[2] = dataNodes[1]; testNodes[3] = dataNodes[0]; - cluster.pseudoSortByDistance(dataNodes[0], testNodes ); + cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[1]); @@ -124,7 +124,7 @@ public class TestNetworkTopologyWithNodeGroup { testNodes[1] = dataNodes[3]; testNodes[2] = dataNodes[2]; testNodes[3] = dataNodes[0]; - cluster.pseudoSortByDistance(dataNodes[0], testNodes ); + cluster.sortByDistance(dataNodes[0], testNodes, 0xDEADBEEF); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[2]); @@ -133,7 +133,7 @@ public class TestNetworkTopologyWithNodeGroup { testNodes[1] = dataNodes[7]; testNodes[2] = dataNodes[2]; testNodes[3] = dataNodes[0]; - cluster.pseudoSortByDistance(computeNode, testNodes ); + cluster.sortByDistance(computeNode, testNodes, 0xDEADBEEF); assertTrue(testNodes[0] == dataNodes[0]); assertTrue(testNodes[1] == dataNodes[2]); }