HDFS-1765. Merging change r1213537 from trunk to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297865 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-03-07 06:54:06 +00:00
parent aec70c23aa
commit 99b0f2aa2c
5 changed files with 238 additions and 81 deletions

View File

@ -159,6 +159,9 @@ Release 0.23.3 - UNRELEASED
HDFS-2188. Make FSEditLog create its journals from a list of URIs rather
than NNStorage. (Ivan Kelly via jitendra)
HDFS-1765. Block Replication should respect under-replication
block priority. (Uma Maheswara Rao G via eli)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -170,9 +170,6 @@ public class BlockManager {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
/** Last block index used for replication work. */
private int replIndex = 0;
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
@ -936,74 +933,16 @@ public class BlockManager {
* @return number of blocks scheduled for replication during this iteration.
*/
private int computeReplicationWork(int blocksToProcess) throws IOException {
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
// replicate blocks
return computeReplicationWorkForBlocks(blocksToReplicate);
}
/**
* Get a list of block lists to be replicated The index of block lists
* represents the
*
* @param blocksToProcess
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
UnderReplicatedBlocks.LEVEL);
for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
List<List<Block>> blocksToReplicate = null;
namesystem.writeLock();
try {
synchronized (neededReplications) {
if (neededReplications.size() == 0) {
return blocksToReplicate;
}
// Go through all blocks that need replications.
UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// # of blocks to process equals either twice the number of live
// data-nodes or the number of under-replicated blocks whichever is less
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
if (!neededReplicationsIterator.hasNext()) {
// start from the beginning
replIndex = 0;
blocksToProcess = Math.min(blocksToProcess, neededReplications
.size());
if (blkCnt >= blocksToProcess)
break;
neededReplicationsIterator = neededReplications.iterator();
assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty.";
}
Block block = neededReplicationsIterator.next();
int priority = neededReplicationsIterator.getPriority();
if (priority < 0 || priority >= blocksToReplicate.size()) {
LOG.warn("Unexpected replication priority: "
+ priority + " " + block);
} else {
blocksToReplicate.get(priority).add(block);
}
} // end for
} // end synchronized neededReplication
// Choose the blocks to be replicated
blocksToReplicate = neededReplications
.chooseUnderReplicatedBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
return blocksToReplicate;
return computeReplicationWorkForBlocks(blocksToReplicate);
}
/** Replicate a set of blocks
@ -1032,7 +971,7 @@ public class BlockManager {
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
continue;
}
@ -1056,7 +995,7 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
@ -1117,7 +1056,7 @@ public class BlockManager {
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
replIndex--;
neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = fileINode.getReplication();
@ -1131,7 +1070,7 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
@ -1169,7 +1108,7 @@ public class BlockManager {
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
}
}
}

View File

@ -18,10 +18,11 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.TreeSet;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
@ -84,10 +85,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>();
/** Stores the replication index for each priority */
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
/** Create an object. */
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>());
priorityToReplIdx.put(i, 0);
}
}
@ -303,6 +308,70 @@ class UnderReplicatedBlocks implements Iterable<Block> {
}
}
}
/**
* Get a list of block lists to be replicated. The index of block lists
* represents its replication priority. Replication index will be tracked for
* each priority list separately in priorityToReplIdx map. Iterates through
* all priority lists and find the elements after replication index. Once the
* last priority lists reaches to end, all replication indexes will be set to
* 0 and start from 1st priority list to fulfill the blockToProces count.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}
int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
blocksToProcess = Math.min(blocksToProcess, size());
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}
// Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
/** returns an iterator of all blocks in a given priority queue */
synchronized BlockIterator iterator(int level) {
@ -383,4 +452,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
return level;
}
}
/**
* This method is to decrement the replication index for the given priority
*
* @param priority - int priority level
*/
public void decrementReplicationIndex(int priority) {
Integer replIdx = priorityToReplIdx.get(priority);
priorityToReplIdx.put(priority, --replIdx);
}
}

View File

@ -17,26 +17,32 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import junit.framework.TestCase;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.junit.Test;
public class TestReplicationPolicy extends TestCase {
public class TestReplicationPolicy {
private Random random= DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
private static final Configuration CONF = new HdfsConfiguration();
@ -90,6 +96,7 @@ public class TestReplicationPolicy extends TestCase {
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
* @throws Exception
*/
@Test
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@ -150,6 +157,7 @@ public class TestReplicationPolicy extends TestCase {
* should be placed on a third rack.
* @throws Exception
*/
@Test
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
@ -225,6 +233,7 @@ public class TestReplicationPolicy extends TestCase {
* and the rest should be placed on the third rack.
* @throws Exception
*/
@Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
@ -278,6 +287,7 @@ public class TestReplicationPolicy extends TestCase {
* the 3rd replica should be placed on the same rack as the 1st replica,
* @throws Exception
*/
@Test
public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
@ -325,6 +335,7 @@ public class TestReplicationPolicy extends TestCase {
* the 3rd replica should be placed on the same rack as the 2nd replica,
* @throws Exception
*/
@Test
public void testChooseTarget5() throws Exception {
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
@ -354,6 +365,7 @@ public class TestReplicationPolicy extends TestCase {
* the 1st replica. The 3rd replica can be placed randomly.
* @throws Exception
*/
@Test
public void testRereplicate1() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -388,6 +400,7 @@ public class TestReplicationPolicy extends TestCase {
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate2() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -417,6 +430,7 @@ public class TestReplicationPolicy extends TestCase {
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate3() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -450,4 +464,122 @@ public class TestReplicationPolicy extends TestCase {
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
}
/**
* Test for the high priority blocks are processed before the low priority
* blocks.
*/
@Test(timeout = 60000)
public void testReplicationWithPriority() throws Exception {
int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
int HIGH_PRIORITY = 0;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).build();
try {
cluster.waitActive();
final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
.getNameNode().getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
}
// Lets wait for the replication interval, to start process normal
// priority blocks
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Adding the block directly to high priority list
neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
// Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Check replication completed successfully. Need not wait till it process
// all the 100 normal blocks.
assertFalse("Not able to clear the element from high priority list",
neededReplications.iterator(HIGH_PRIORITY).hasNext());
} finally {
cluster.shutdown();
}
}
/**
* Test for the ChooseUnderReplicatedBlocks are processed based on priority
*/
@Test
public void testChooseUnderReplicatedBlocks() throws Exception {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
}
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
// from
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
// QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
// block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
// Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
// and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
// Since it is reached to end of all lists,
// should start picking the blocks from start.
// Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from
// QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
}
/** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks(
List<List<Block>> chosenBlocks, int firstPrioritySize,
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
int fifthPrioritySize) {
assertEquals(
"Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
firstPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
assertEquals(
"Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
secondPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
thirdPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
fourthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
assertEquals(
"Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
fifthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
}
}

View File

@ -149,9 +149,7 @@ public class TestNameNodeMetrics {
fs.delete(file, true);
filesTotal--; // reduce the filecount for deleted file
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
waitForDeletion();
updateMetrics();
rb = getMetrics(NS_METRICS);
assertGauge("FilesTotal", filesTotal, rb);
@ -182,7 +180,7 @@ public class TestNameNodeMetrics {
assertGauge("PendingReplicationBlocks", 1L, rb);
assertGauge("ScheduledReplicationBlocks", 1L, rb);
fs.delete(file, true);
updateMetrics();
waitForDeletion();
rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
@ -221,9 +219,15 @@ public class TestNameNodeMetrics {
assertGauge("UnderReplicatedBlocks", 1L, rb);
assertGauge("MissingBlocks", 1L, rb);
fs.delete(file, true);
updateMetrics();
waitForDeletion();
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
}
private void waitForDeletion() throws InterruptedException {
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
}
@Test
public void testRenameMetrics() throws Exception {