HDFS-6840. Clients are always sent to the same datanode when read is off rack. (wang)

This commit is contained in:
Andrew Wang 2014-09-18 17:49:17 -07:00
parent 20a076bafc
commit 8e73084491
9 changed files with 57 additions and 91 deletions

View File

@ -26,6 +26,7 @@ import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -689,6 +690,12 @@ public class NetworkTopology {
return rand;
}
@VisibleForTesting
void setRandomSeed(long seed) {
Random rand = getRandom();
rand.setSeed(seed);
}
/** randomly choose one node from <i>scope</i>
* if scope starts with ~, choose one from the all nodes except for the
* ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
@ -870,21 +877,19 @@ public class NetworkTopology {
/**
* Sort nodes array by network distance to <i>reader</i>.
* <p/>
* In a three-level topology, a node can be either local, on the same rack, or
* on a different rack from the reader. Sorting the nodes based on network
* distance from the reader reduces network traffic and improves performance.
* In a three-level topology, a node can be either local, on the same rack,
* or on a different rack from the reader. Sorting the nodes based on network
* distance from the reader reduces network traffic and improves
* performance.
* <p/>
* As an additional twist, we also randomize the nodes at each network
* distance using the provided random seed. This helps with load balancing
* when there is data skew.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param seed Used to seed the pseudo-random generator that randomizes the
* set of nodes at each network distance.
* distance. This helps with load balancing when there is data skew.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
*/
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
long seed, boolean randomizeBlockLocationsPerBlock) {
public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
/** Sort weights for the nodes array */
int[] weights = new int[activeLen];
for (int i=0; i<activeLen; i++) {
@ -903,14 +908,7 @@ public class NetworkTopology {
list.add(node);
}
// Seed is normally the block id
// This means we use the same pseudo-random order for each block, for
// potentially better page cache usage.
// Seed is not used if we want to randomize block location for every block
Random rand = getRandom();
if (!randomizeBlockLocationsPerBlock) {
rand.setSeed(seed);
}
int idx = 0;
for (List<Node> list: tree.values()) {
if (list != null) {

View File

@ -268,19 +268,17 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
/**
* Sort nodes array by their distances to <i>reader</i>.
* <p/>
* This is the same as
* {@link NetworkTopology#sortByDistance(Node, Node[], long)} except with a
* four-level network topology which contains the additional network distance
* of a "node group" which is between local and same rack.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param seed Used to seed the pseudo-random generator that randomizes the
* set of nodes at each network distance.
* This is the same as {@link NetworkTopology#sortByDistance(Node, Node[],
* int)} except with a four-level network topology which contains the
* additional network distance of a "node group" which is between local and
* same rack.
*
* @param reader Node where data will be read
* @param nodes Available replicas with the requested data
* @param activeLen Number of active nodes at the front of the array
*/
@Override
public void sortByDistance(Node reader, Node[] nodes, int activeLen,
long seed, boolean randomizeBlockLocationsPerBlock) {
public void sortByDistance(Node reader, Node[] nodes, int activeLen) {
// If reader is not a datanode (not in NetworkTopology tree), we need to
// replace this reader with a sibling leaf node in tree.
if (reader != null && !this.contains(reader)) {
@ -293,8 +291,7 @@ public class NetworkTopologyWithNodeGroup extends NetworkTopology {
return;
}
}
super.sortByDistance(reader, nodes, activeLen, seed,
randomizeBlockLocationsPerBlock);
super.sortByDistance(reader, nodes, activeLen);
}
/** InnerNodeWithNodeGroup represents a switch/router of a data center, rack

View File

@ -104,8 +104,7 @@ public class TestNetworkTopologyWithNodeGroup {
testNodes[1] = dataNodes[2];
testNodes[2] = dataNodes[3];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
@ -116,8 +115,7 @@ public class TestNetworkTopologyWithNodeGroup {
testNodes[1] = dataNodes[4];
testNodes[2] = dataNodes[1];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
@ -126,8 +124,7 @@ public class TestNetworkTopologyWithNodeGroup {
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
@ -136,8 +133,7 @@ public class TestNetworkTopologyWithNodeGroup {
testNodes[1] = dataNodes[7];
testNodes[2] = dataNodes[2];
testNodes[3] = dataNodes[0];
cluster.sortByDistance(computeNode, testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.sortByDistance(computeNode, testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[2]);
}

View File

@ -771,6 +771,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7078. Fix listEZs to work correctly with snapshots. (wang)
HDFS-6840. Clients are always sent to the same datanode when read
is off rack. (wang)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -221,9 +221,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version";
public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT";
public static final String DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK = "dfs.namenode.randomize-block-locations-per-block";
public static final boolean DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT = false;
public static final String DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY = "dfs.namenode.edits.dir.minimum";
public static final int DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT = 1;

View File

@ -348,8 +348,7 @@ public class DatanodeManager {
/** Sort the located blocks by the distance to the target host. */
public void sortLocatedBlocks(final String targethost,
final List<LocatedBlock> locatedblocks,
boolean randomizeBlockLocationsPerBlock) {
final List<LocatedBlock> locatedblocks) {
//sort the blocks
// As it is possible for the separation of node manager and datanode,
// here we should get node but not datanode only .
@ -376,8 +375,7 @@ public class DatanodeManager {
--lastActiveIndex;
}
int activeLen = lastActiveIndex + 1;
networktopology.sortByDistance(client, b.getLocations(), activeLen, b
.getBlock().getBlockId(), randomizeBlockLocationsPerBlock);
networktopology.sortByDistance(client, b.getLocations(), activeLen);
}
}

View File

@ -65,8 +65,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ENABLE_RETRY_CAC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
@ -547,8 +545,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final FSImage fsImage;
private boolean randomizeBlockLocationsPerBlock;
/**
* Notify that loading of this FSDirectory is complete, and
* it is imageLoaded for use
@ -865,10 +861,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
this.randomizeBlockLocationsPerBlock = conf.getBoolean(
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK,
DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(this, conf);
this.snapshotManager = new SnapshotManager(dir);
@ -1739,7 +1731,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
true);
if (blocks != null) {
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
blocks.getLocatedBlocks(), randomizeBlockLocationsPerBlock);
blocks.getLocatedBlocks());
// lastBlock is not part of getLocatedBlocks(), might need to sort it too
LocatedBlock lastBlock = blocks.getLastLocatedBlock();
@ -1748,7 +1740,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Lists.newArrayListWithCapacity(1);
lastBlockList.add(lastBlock);
blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
lastBlockList, randomizeBlockLocationsPerBlock);
lastBlockList);
}
}
return blocks;

View File

@ -2069,19 +2069,6 @@
</description>
</property>
<property>
<name>dfs.namenode.randomize-block-locations-per-block</name>
<value>false</value>
<description>When fetching replica locations of a block, the replicas
are sorted based on network distance. This configuration parameter
determines whether the replicas at the same network distance are randomly
shuffled. By default, this is false, such that repeated requests for a block's
replicas always result in the same order. This potentially improves page cache
behavior. However, for some network topologies, it is desirable to shuffle this
order for better load balancing.
</description>
</property>
<property>
<name>dfs.datanode.block.id.layout.upgrade.threads</name>
<value>12</value>

View File

@ -139,8 +139,8 @@ public class TestNetworkTopology {
testNodes[0] = dataNodes[1];
testNodes[1] = dataNodes[2];
testNodes[2] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[2]);
@ -152,8 +152,8 @@ public class TestNetworkTopology {
dtestNodes[2] = dataNodes[11];
dtestNodes[3] = dataNodes[9];
dtestNodes[4] = dataNodes[10];
cluster.sortByDistance(dataNodes[8], dtestNodes,
dtestNodes.length - 2, 0xDEADBEEF, false);
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[8], dtestNodes, dtestNodes.length - 2);
assertTrue(dtestNodes[0] == dataNodes[8]);
assertTrue(dtestNodes[1] == dataNodes[11]);
assertTrue(dtestNodes[2] == dataNodes[12]);
@ -164,8 +164,8 @@ public class TestNetworkTopology {
testNodes[0] = dataNodes[1];
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[0];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[0]);
assertTrue(testNodes[1] == dataNodes[1]);
assertTrue(testNodes[2] == dataNodes[3]);
@ -174,8 +174,8 @@ public class TestNetworkTopology {
testNodes[0] = dataNodes[5];
testNodes[1] = dataNodes[3];
testNodes[2] = dataNodes[1];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[1]);
assertTrue(testNodes[1] == dataNodes[3]);
assertTrue(testNodes[2] == dataNodes[5]);
@ -184,8 +184,8 @@ public class TestNetworkTopology {
testNodes[0] = dataNodes[1];
testNodes[1] = dataNodes[5];
testNodes[2] = dataNodes[3];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEADBEEF, false);
cluster.setRandomSeed(0xDEADBEEF);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
assertTrue(testNodes[0] == dataNodes[1]);
assertTrue(testNodes[1] == dataNodes[3]);
assertTrue(testNodes[2] == dataNodes[5]);
@ -194,24 +194,23 @@ public class TestNetworkTopology {
testNodes[0] = dataNodes[1];
testNodes[1] = dataNodes[5];
testNodes[2] = dataNodes[3];
cluster.sortByDistance(dataNodes[0], testNodes,
testNodes.length, 0xDEAD, false);
cluster.setRandomSeed(0xDEAD);
cluster.sortByDistance(dataNodes[0], testNodes, testNodes.length);
// sortByDistance does not take the "data center" layer into consideration
// and it doesn't sort by getDistance, so 1, 5, 3 is also valid here
assertTrue(testNodes[0] == dataNodes[1]);
assertTrue(testNodes[1] == dataNodes[5]);
assertTrue(testNodes[2] == dataNodes[3]);
// Array is just local rack nodes
// Expect a random first node depending on the seed (normally the block ID).
// Array of just rack-local nodes
// Expect a random first node
DatanodeDescriptor first = null;
boolean foundRandom = false;
for (int i=5; i<=7; i++) {
testNodes[0] = dataNodes[5];
testNodes[1] = dataNodes[6];
testNodes[2] = dataNodes[7];
cluster.sortByDistance(dataNodes[i], testNodes,
testNodes.length, 0xBEADED+i, false);
cluster.sortByDistance(dataNodes[i], testNodes, testNodes.length);
if (first == null) {
first = testNodes[0];
} else {
@ -222,16 +221,15 @@ public class TestNetworkTopology {
}
}
assertTrue("Expected to find a different first location", foundRandom);
// Array of rack local nodes with randomizeBlockLocationsPerBlock set to
// true
// Expect random order of block locations for same block
// Array of just remote nodes
// Expect random first node
first = null;
for (int i = 1; i <= 4; i++) {
testNodes[0] = dataNodes[13];
testNodes[1] = dataNodes[14];
testNodes[2] = dataNodes[15];
cluster.sortByDistance(dataNodes[15 + i], testNodes, testNodes.length,
0xBEADED, true);
cluster.sortByDistance(dataNodes[i], testNodes, testNodes.length);
if (first == null) {
first = testNodes[0];
} else {