HDFS-7300. HDFS-7300. The getMaxNodesPerRack() method in

BlockPlacementPolicyDefault is flawed. contributed by Kihwal Lee
(cherry picked from commit 3ae84e1ba8)
This commit is contained in:
Kihwal Lee 2014-10-29 17:25:05 -05:00
parent 0c15130db2
commit 79c57e5913
5 changed files with 95 additions and 5 deletions

View File

@ -970,6 +970,10 @@ Release 2.6.0 - UNRELEASED
HDFS-7287. The OfflineImageViewer (OIV) can output invalid XML depending on HDFS-7287. The OfflineImageViewer (OIV) can output invalid XML depending on
the filename (Ravi Prakash via Colin P. McCabe) the filename (Ravi Prakash via Colin P. McCabe)
HDFS-7300. The getMaxNodesPerRack() method in BlockPlacementPolicyDefault
is flawed (kihwal)
BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE BREAKDOWN OF HDFS-6584 ARCHIVAL STORAGE
HDFS-6677. Change INodeFile and FSImage to support storage policy ID. HDFS-6677. Change INodeFile and FSImage to support storage policy ID.

View File

@ -139,13 +139,17 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
boolean avoidStaleNodes = stats != null boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite(); && stats.isAvoidingStaleDataNodesForWrite();
int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
numOfReplicas = maxNodesAndReplicas[0];
int maxNodesPerRack = maxNodesAndReplicas[1];
for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) { for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
DatanodeDescriptor favoredNode = favoredNodes.get(i); DatanodeDescriptor favoredNode = favoredNodes.get(i);
// Choose a single node which is local to favoredNode. // Choose a single node which is local to favoredNode.
// 'results' is updated within chooseLocalNode // 'results' is updated within chooseLocalNode
final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
favoriteAndExcludedNodes, blocksize, favoriteAndExcludedNodes, blocksize, maxNodesPerRack,
getMaxNodesPerRack(results.size(), numOfReplicas)[1],
results, avoidStaleNodes, storageTypes, false); results, avoidStaleNodes, storageTypes, false);
if (target == null) { if (target == null) {
LOG.warn("Could not find a target for file " + src LOG.warn("Could not find a target for file " + src
@ -221,6 +225,19 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
results.toArray(new DatanodeStorageInfo[results.size()])); results.toArray(new DatanodeStorageInfo[results.size()]));
} }
/**
* Calculate the maximum number of replicas to allocate per rack. It also
* limits the total number of replicas to the total number of nodes in the
* cluster. Caller should adjust the replica count to the return value.
*
* @param numOfChosen The number of already chosen nodes.
* @param numOfReplicas The number of additional nodes to allocate.
* @return integer array. Index 0: The number of nodes allowed to allocate
* in addition to already chosen nodes.
* Index 1: The maximum allowed number of nodes per rack. This
* is independent of the number of chosen nodes, as it is calculated
* using the target number of replicas.
*/
private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { private int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) {
int clusterSize = clusterMap.getNumOfLeaves(); int clusterSize = clusterMap.getNumOfLeaves();
int totalNumOfReplicas = numOfChosen + numOfReplicas; int totalNumOfReplicas = numOfChosen + numOfReplicas;
@ -228,7 +245,26 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
numOfReplicas -= (totalNumOfReplicas-clusterSize); numOfReplicas -= (totalNumOfReplicas-clusterSize);
totalNumOfReplicas = clusterSize; totalNumOfReplicas = clusterSize;
} }
int maxNodesPerRack = (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2; // No calculation needed when there is only one rack or picking one node.
int numOfRacks = clusterMap.getNumOfRacks();
if (numOfRacks == 1 || totalNumOfReplicas <= 1) {
return new int[] {numOfReplicas, totalNumOfReplicas};
}
int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2;
// At this point, there are more than one racks and more than one replicas
// to store. Avoid all replicas being in the same rack.
//
// maxNodesPerRack has the following properties at this stage.
// 1) maxNodesPerRack >= 2
// 2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas
// when numOfRacks > 1
//
// Thus, the following adjustment will still result in a value that forces
// multi-rack allocation and gives enough number of total nodes.
if (maxNodesPerRack == totalNumOfReplicas) {
maxNodesPerRack--;
}
return new int[] {numOfReplicas, maxNodesPerRack}; return new int[] {numOfReplicas, maxNodesPerRack};
} }

View File

@ -188,7 +188,7 @@ public class TestFileAppendRestart {
try { try {
cluster = new MiniDFSCluster.Builder(conf).manageDataDfsDirs(true) cluster = new MiniDFSCluster.Builder(conf).manageDataDfsDirs(true)
.manageNameDfsDirs(true).numDataNodes(4) .manageNameDfsDirs(true).numDataNodes(4)
.racks(new String[] { "/rack1", "/rack1", "/rack1", "/rack2" }) .racks(new String[] { "/rack1", "/rack1", "/rack2", "/rack2" })
.build(); .build();
cluster.waitActive(); cluster.waitActive();

View File

@ -281,7 +281,8 @@ public class TestBlockManager {
assertTrue("Source of replication should be one of the nodes the block " + assertTrue("Source of replication should be one of the nodes the block " +
"was on. Was: " + pipeline[0], "was on. Was: " + pipeline[0],
origStorages.contains(pipeline[0])); origStorages.contains(pipeline[0]));
assertEquals("Should have three targets", 3, pipeline.length); // Only up to two nodes can be picked per rack when there are two racks.
assertEquals("Should have two targets", 2, pipeline.length);
boolean foundOneOnRackB = false; boolean foundOneOnRackB = false;
for (int i = 1; i < pipeline.length; i++) { for (int i = 1; i < pipeline.length; i++) {

View File

@ -429,6 +429,55 @@ public class TestReplicationPolicy {
assertFalse(isOnSameRack(targets[0], targets[1])); assertFalse(isOnSameRack(targets[0], targets[1]));
} }
/**
* In this testcase, there are enough total number of nodes, but only
* one rack is actually available.
* @throws Exception
*/
@Test
public void testChooseTarget6() throws Exception {
DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo(
"DS-xxxx", "7.7.7.7", "/d2/r3", "host7");
DatanodeDescriptor newDn = storage.getDatanodeDescriptor();
Set<Node> excludedNodes;
List<DatanodeStorageInfo> chosenNodes = new ArrayList<DatanodeStorageInfo>();
excludedNodes = new HashSet<Node>();
excludedNodes.add(dataNodes[0]);
excludedNodes.add(dataNodes[1]);
excludedNodes.add(dataNodes[2]);
excludedNodes.add(dataNodes[3]);
DatanodeStorageInfo[] targets;
// Only two nodes available in a rack. Try picking two nodes. Only one
// should return.
targets = chooseTarget(2, chosenNodes, excludedNodes);
assertEquals(1, targets.length);
// Make three nodes available in a rack.
final BlockManager bm = namenode.getNamesystem().getBlockManager();
bm.getDatanodeManager().getNetworkTopology().add(newDn);
bm.getDatanodeManager().getHeartbeatManager().addDatanode(newDn);
updateHeartbeatWithUsage(newDn,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
// Try picking three nodes. Only two should return.
excludedNodes.clear();
excludedNodes.add(dataNodes[0]);
excludedNodes.add(dataNodes[1]);
excludedNodes.add(dataNodes[2]);
excludedNodes.add(dataNodes[3]);
chosenNodes.clear();
try {
targets = chooseTarget(3, chosenNodes, excludedNodes);
assertEquals(2, targets.length);
} finally {
bm.getDatanodeManager().getNetworkTopology().remove(newDn);
}
}
/** /**
* In this testcase, it tries to choose more targets than available nodes and * In this testcase, it tries to choose more targets than available nodes and
* check the result, with stale node avoidance on the write path enabled. * check the result, with stale node avoidance on the write path enabled.