HDFS-1765. Block Replication should respect under-replication block priority. Contributed by Uma Maheswara Rao G
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1213537 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
13345f3a85
commit
43100e9c0e
|
@ -233,6 +233,9 @@ Release 0.23.1 - UNRELEASED
|
||||||
HDFS-2590. Fix the missing links in the WebHDFS forrest doc. (szetszwo)
|
HDFS-2590. Fix the missing links in the WebHDFS forrest doc. (szetszwo)
|
||||||
|
|
||||||
HDFS-2596. TestDirectoryScanner doesn't test parallel scans. (eli)
|
HDFS-2596. TestDirectoryScanner doesn't test parallel scans. (eli)
|
||||||
|
|
||||||
|
HDFS-1765. Block Replication should respect under-replication
|
||||||
|
block priority. (Uma Maheswara Rao G via eli)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
|
|
|
@ -168,9 +168,6 @@ public class BlockManager {
|
||||||
/** variable to enable check for enough racks */
|
/** variable to enable check for enough racks */
|
||||||
final boolean shouldCheckForEnoughRacks;
|
final boolean shouldCheckForEnoughRacks;
|
||||||
|
|
||||||
/** Last block index used for replication work. */
|
|
||||||
private int replIndex = 0;
|
|
||||||
|
|
||||||
/** for block replicas placement */
|
/** for block replicas placement */
|
||||||
private BlockPlacementPolicy blockplacement;
|
private BlockPlacementPolicy blockplacement;
|
||||||
|
|
||||||
|
@ -923,74 +920,16 @@ public class BlockManager {
|
||||||
* @return number of blocks scheduled for replication during this iteration.
|
* @return number of blocks scheduled for replication during this iteration.
|
||||||
*/
|
*/
|
||||||
private int computeReplicationWork(int blocksToProcess) throws IOException {
|
private int computeReplicationWork(int blocksToProcess) throws IOException {
|
||||||
// Choose the blocks to be replicated
|
List<List<Block>> blocksToReplicate = null;
|
||||||
List<List<Block>> blocksToReplicate =
|
|
||||||
chooseUnderReplicatedBlocks(blocksToProcess);
|
|
||||||
|
|
||||||
// replicate blocks
|
|
||||||
return computeReplicationWorkForBlocks(blocksToReplicate);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a list of block lists to be replicated The index of block lists
|
|
||||||
* represents the
|
|
||||||
*
|
|
||||||
* @param blocksToProcess
|
|
||||||
* @return Return a list of block lists to be replicated. The block list index
|
|
||||||
* represents its replication priority.
|
|
||||||
*/
|
|
||||||
private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
|
|
||||||
// initialize data structure for the return value
|
|
||||||
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
|
|
||||||
UnderReplicatedBlocks.LEVEL);
|
|
||||||
for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
|
|
||||||
blocksToReplicate.add(new ArrayList<Block>());
|
|
||||||
}
|
|
||||||
namesystem.writeLock();
|
namesystem.writeLock();
|
||||||
try {
|
try {
|
||||||
synchronized (neededReplications) {
|
// Choose the blocks to be replicated
|
||||||
if (neededReplications.size() == 0) {
|
blocksToReplicate = neededReplications
|
||||||
return blocksToReplicate;
|
.chooseUnderReplicatedBlocks(blocksToProcess);
|
||||||
}
|
|
||||||
|
|
||||||
// Go through all blocks that need replications.
|
|
||||||
UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
|
|
||||||
neededReplications.iterator();
|
|
||||||
// skip to the first unprocessed block, which is at replIndex
|
|
||||||
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
|
|
||||||
neededReplicationsIterator.next();
|
|
||||||
}
|
|
||||||
// # of blocks to process equals either twice the number of live
|
|
||||||
// data-nodes or the number of under-replicated blocks whichever is less
|
|
||||||
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
|
|
||||||
|
|
||||||
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
|
|
||||||
if (!neededReplicationsIterator.hasNext()) {
|
|
||||||
// start from the beginning
|
|
||||||
replIndex = 0;
|
|
||||||
blocksToProcess = Math.min(blocksToProcess, neededReplications
|
|
||||||
.size());
|
|
||||||
if (blkCnt >= blocksToProcess)
|
|
||||||
break;
|
|
||||||
neededReplicationsIterator = neededReplications.iterator();
|
|
||||||
assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty.";
|
|
||||||
}
|
|
||||||
|
|
||||||
Block block = neededReplicationsIterator.next();
|
|
||||||
int priority = neededReplicationsIterator.getPriority();
|
|
||||||
if (priority < 0 || priority >= blocksToReplicate.size()) {
|
|
||||||
LOG.warn("Unexpected replication priority: "
|
|
||||||
+ priority + " " + block);
|
|
||||||
} else {
|
|
||||||
blocksToReplicate.get(priority).add(block);
|
|
||||||
}
|
|
||||||
} // end for
|
|
||||||
} // end synchronized neededReplication
|
|
||||||
} finally {
|
} finally {
|
||||||
namesystem.writeUnlock();
|
namesystem.writeUnlock();
|
||||||
}
|
}
|
||||||
|
return computeReplicationWorkForBlocks(blocksToReplicate);
|
||||||
return blocksToReplicate;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Replicate a set of blocks
|
/** Replicate a set of blocks
|
||||||
|
@ -1019,7 +958,7 @@ public class BlockManager {
|
||||||
// abandoned block or block reopened for append
|
// abandoned block or block reopened for append
|
||||||
if(fileINode == null || fileINode.isUnderConstruction()) {
|
if(fileINode == null || fileINode.isUnderConstruction()) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
replIndex--;
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1043,7 +982,7 @@ public class BlockManager {
|
||||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||||
(blockHasEnoughRacks(block)) ) {
|
(blockHasEnoughRacks(block)) ) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
replIndex--;
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
NameNode.stateChangeLog.info("BLOCK* "
|
NameNode.stateChangeLog.info("BLOCK* "
|
||||||
+ "Removing block " + block
|
+ "Removing block " + block
|
||||||
+ " from neededReplications as it has enough replicas.");
|
+ " from neededReplications as it has enough replicas.");
|
||||||
|
@ -1104,7 +1043,7 @@ public class BlockManager {
|
||||||
if(fileINode == null || fileINode.isUnderConstruction()) {
|
if(fileINode == null || fileINode.isUnderConstruction()) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
rw.targets = null;
|
rw.targets = null;
|
||||||
replIndex--;
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
requiredReplication = fileINode.getReplication();
|
requiredReplication = fileINode.getReplication();
|
||||||
|
@ -1118,7 +1057,7 @@ public class BlockManager {
|
||||||
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
if ( (pendingReplications.getNumReplicas(block) > 0) ||
|
||||||
(blockHasEnoughRacks(block)) ) {
|
(blockHasEnoughRacks(block)) ) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
replIndex--;
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
rw.targets = null;
|
rw.targets = null;
|
||||||
NameNode.stateChangeLog.info("BLOCK* "
|
NameNode.stateChangeLog.info("BLOCK* "
|
||||||
+ "Removing block " + block
|
+ "Removing block " + block
|
||||||
|
@ -1156,7 +1095,7 @@ public class BlockManager {
|
||||||
// remove from neededReplications
|
// remove from neededReplications
|
||||||
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
if(numEffectiveReplicas + targets.length >= requiredReplication) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
replIndex--;
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,8 +18,11 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
@ -81,10 +84,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
||||||
private List<LightWeightLinkedSet<Block>> priorityQueues
|
private List<LightWeightLinkedSet<Block>> priorityQueues
|
||||||
= new ArrayList<LightWeightLinkedSet<Block>>();
|
= new ArrayList<LightWeightLinkedSet<Block>>();
|
||||||
|
|
||||||
|
/** Stores the replication index for each priority */
|
||||||
|
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
|
||||||
|
|
||||||
/** Create an object. */
|
/** Create an object. */
|
||||||
UnderReplicatedBlocks() {
|
UnderReplicatedBlocks() {
|
||||||
for (int i = 0; i < LEVEL; i++) {
|
for (int i = 0; i < LEVEL; i++) {
|
||||||
priorityQueues.add(new LightWeightLinkedSet<Block>());
|
priorityQueues.add(new LightWeightLinkedSet<Block>());
|
||||||
|
priorityToReplIdx.put(i, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,6 +307,70 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a list of block lists to be replicated. The index of block lists
|
||||||
|
* represents its replication priority. Replication index will be tracked for
|
||||||
|
* each priority list separately in priorityToReplIdx map. Iterates through
|
||||||
|
* all priority lists and find the elements after replication index. Once the
|
||||||
|
* last priority lists reaches to end, all replication indexes will be set to
|
||||||
|
* 0 and start from 1st priority list to fulfill the blockToProces count.
|
||||||
|
*
|
||||||
|
* @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
|
||||||
|
* @return Return a list of block lists to be replicated. The block list index
|
||||||
|
* represents its replication priority.
|
||||||
|
*/
|
||||||
|
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
|
||||||
|
int blocksToProcess) {
|
||||||
|
// initialize data structure for the return value
|
||||||
|
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
|
||||||
|
for (int i = 0; i < LEVEL; i++) {
|
||||||
|
blocksToReplicate.add(new ArrayList<Block>());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (size() == 0) { // There are no blocks to collect.
|
||||||
|
return blocksToReplicate;
|
||||||
|
}
|
||||||
|
|
||||||
|
int blockCount = 0;
|
||||||
|
for (int priority = 0; priority < LEVEL; priority++) {
|
||||||
|
// Go through all blocks that need replications with current priority.
|
||||||
|
BlockIterator neededReplicationsIterator = iterator(priority);
|
||||||
|
Integer replIndex = priorityToReplIdx.get(priority);
|
||||||
|
|
||||||
|
// skip to the first unprocessed block, which is at replIndex
|
||||||
|
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
|
||||||
|
neededReplicationsIterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
blocksToProcess = Math.min(blocksToProcess, size());
|
||||||
|
|
||||||
|
if (blockCount == blocksToProcess) {
|
||||||
|
break; // break if already expected blocks are obtained
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop through all remaining blocks in the list.
|
||||||
|
while (blockCount < blocksToProcess
|
||||||
|
&& neededReplicationsIterator.hasNext()) {
|
||||||
|
Block block = neededReplicationsIterator.next();
|
||||||
|
blocksToReplicate.get(priority).add(block);
|
||||||
|
replIndex++;
|
||||||
|
blockCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!neededReplicationsIterator.hasNext()
|
||||||
|
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
|
||||||
|
// reset all priorities replication index to 0 because there is no
|
||||||
|
// recently added blocks in any list.
|
||||||
|
for (int i = 0; i < LEVEL; i++) {
|
||||||
|
priorityToReplIdx.put(i, 0);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
priorityToReplIdx.put(priority, replIndex);
|
||||||
|
}
|
||||||
|
return blocksToReplicate;
|
||||||
|
}
|
||||||
|
|
||||||
/** returns an iterator of all blocks in a given priority queue */
|
/** returns an iterator of all blocks in a given priority queue */
|
||||||
synchronized BlockIterator iterator(int level) {
|
synchronized BlockIterator iterator(int level) {
|
||||||
|
@ -380,4 +451,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
|
||||||
return level;
|
return level;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is to decrement the replication index for the given priority
|
||||||
|
*
|
||||||
|
* @param priority - int priority level
|
||||||
|
*/
|
||||||
|
public void decrementReplicationIndex(int priority) {
|
||||||
|
Integer replIdx = priorityToReplIdx.get(priority);
|
||||||
|
priorityToReplIdx.put(priority, --replIdx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,26 +17,32 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
import junit.framework.TestCase;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestReplicationPolicy extends TestCase {
|
public class TestReplicationPolicy {
|
||||||
|
private Random random= DFSUtil.getRandom();
|
||||||
private static final int BLOCK_SIZE = 1024;
|
private static final int BLOCK_SIZE = 1024;
|
||||||
private static final int NUM_OF_DATANODES = 6;
|
private static final int NUM_OF_DATANODES = 6;
|
||||||
private static final Configuration CONF = new HdfsConfiguration();
|
private static final Configuration CONF = new HdfsConfiguration();
|
||||||
|
@ -90,6 +96,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
|
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testChooseTarget1() throws Exception {
|
public void testChooseTarget1() throws Exception {
|
||||||
dataNodes[0].updateHeartbeat(
|
dataNodes[0].updateHeartbeat(
|
||||||
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
|
||||||
|
@ -150,6 +157,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* should be placed on a third rack.
|
* should be placed on a third rack.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testChooseTarget2() throws Exception {
|
public void testChooseTarget2() throws Exception {
|
||||||
HashMap<Node, Node> excludedNodes;
|
HashMap<Node, Node> excludedNodes;
|
||||||
DatanodeDescriptor[] targets;
|
DatanodeDescriptor[] targets;
|
||||||
|
@ -225,6 +233,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* and the rest should be placed on the third rack.
|
* and the rest should be placed on the third rack.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testChooseTarget3() throws Exception {
|
public void testChooseTarget3() throws Exception {
|
||||||
// make data node 0 to be not qualified to choose
|
// make data node 0 to be not qualified to choose
|
||||||
dataNodes[0].updateHeartbeat(
|
dataNodes[0].updateHeartbeat(
|
||||||
|
@ -278,6 +287,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the 3rd replica should be placed on the same rack as the 1st replica,
|
* the 3rd replica should be placed on the same rack as the 1st replica,
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testChoooseTarget4() throws Exception {
|
public void testChoooseTarget4() throws Exception {
|
||||||
// make data node 0 & 1 to be not qualified to choose: not enough disk space
|
// make data node 0 & 1 to be not qualified to choose: not enough disk space
|
||||||
for(int i=0; i<2; i++) {
|
for(int i=0; i<2; i++) {
|
||||||
|
@ -325,6 +335,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the 3rd replica should be placed on the same rack as the 2nd replica,
|
* the 3rd replica should be placed on the same rack as the 2nd replica,
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testChooseTarget5() throws Exception {
|
public void testChooseTarget5() throws Exception {
|
||||||
DatanodeDescriptor[] targets;
|
DatanodeDescriptor[] targets;
|
||||||
targets = replicator.chooseTarget(filename,
|
targets = replicator.chooseTarget(filename,
|
||||||
|
@ -354,6 +365,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the 1st replica. The 3rd replica can be placed randomly.
|
* the 1st replica. The 3rd replica can be placed randomly.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testRereplicate1() throws Exception {
|
public void testRereplicate1() throws Exception {
|
||||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
chosenNodes.add(dataNodes[0]);
|
chosenNodes.add(dataNodes[0]);
|
||||||
|
@ -388,6 +400,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the rest replicas can be placed randomly,
|
* the rest replicas can be placed randomly,
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testRereplicate2() throws Exception {
|
public void testRereplicate2() throws Exception {
|
||||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
chosenNodes.add(dataNodes[0]);
|
chosenNodes.add(dataNodes[0]);
|
||||||
|
@ -417,6 +430,7 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
* the rest replicas can be placed randomly,
|
* the rest replicas can be placed randomly,
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
|
@Test
|
||||||
public void testRereplicate3() throws Exception {
|
public void testRereplicate3() throws Exception {
|
||||||
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
|
||||||
chosenNodes.add(dataNodes[0]);
|
chosenNodes.add(dataNodes[0]);
|
||||||
|
@ -450,4 +464,122 @@ public class TestReplicationPolicy extends TestCase {
|
||||||
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
|
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the high priority blocks are processed before the low priority
|
||||||
|
* blocks.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testReplicationWithPriority() throws Exception {
|
||||||
|
int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
|
||||||
|
int HIGH_PRIORITY = 0;
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
|
||||||
|
.format(true).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
|
||||||
|
.getNameNode().getNamesystem().getBlockManager().neededReplications;
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
// Adding the blocks directly to normal priority
|
||||||
|
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
|
||||||
|
}
|
||||||
|
// Lets wait for the replication interval, to start process normal
|
||||||
|
// priority blocks
|
||||||
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
||||||
|
// Adding the block directly to high priority list
|
||||||
|
neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
|
// Lets wait for the replication interval
|
||||||
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
||||||
|
// Check replication completed successfully. Need not wait till it process
|
||||||
|
// all the 100 normal blocks.
|
||||||
|
assertFalse("Not able to clear the element from high priority list",
|
||||||
|
neededReplications.iterator(HIGH_PRIORITY).hasNext());
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the ChooseUnderReplicatedBlocks are processed based on priority
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testChooseUnderReplicatedBlocks() throws Exception {
|
||||||
|
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
// Adding QUEUE_HIGHEST_PRIORITY block
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
|
// Adding QUEUE_VERY_UNDER_REPLICATED block
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
|
||||||
|
|
||||||
|
// Adding QUEUE_UNDER_REPLICATED block
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
|
||||||
|
|
||||||
|
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
|
||||||
|
|
||||||
|
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
|
||||||
|
// from
|
||||||
|
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
|
List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
|
||||||
|
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
|
||||||
|
|
||||||
|
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
|
||||||
|
// QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
|
||||||
|
// block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
|
||||||
|
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
|
||||||
|
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
|
||||||
|
|
||||||
|
// Adding QUEUE_HIGHEST_PRIORITY
|
||||||
|
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
|
||||||
|
|
||||||
|
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
|
||||||
|
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
|
||||||
|
// and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
|
||||||
|
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
|
||||||
|
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
|
||||||
|
|
||||||
|
// Since it is reached to end of all lists,
|
||||||
|
// should start picking the blocks from start.
|
||||||
|
// Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from
|
||||||
|
// QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
|
||||||
|
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
|
||||||
|
assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** asserts the chosen blocks with expected priority blocks */
|
||||||
|
private void assertTheChosenBlocks(
|
||||||
|
List<List<Block>> chosenBlocks, int firstPrioritySize,
|
||||||
|
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
|
||||||
|
int fifthPrioritySize) {
|
||||||
|
assertEquals(
|
||||||
|
"Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
|
||||||
|
firstPrioritySize, chosenBlocks.get(
|
||||||
|
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
|
||||||
|
assertEquals(
|
||||||
|
"Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
|
||||||
|
secondPrioritySize, chosenBlocks.get(
|
||||||
|
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
|
||||||
|
assertEquals(
|
||||||
|
"Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
|
||||||
|
thirdPrioritySize, chosenBlocks.get(
|
||||||
|
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
|
||||||
|
assertEquals(
|
||||||
|
"Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
|
||||||
|
fourthPrioritySize, chosenBlocks.get(
|
||||||
|
UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
|
||||||
|
assertEquals(
|
||||||
|
"Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
|
||||||
|
fifthPrioritySize, chosenBlocks.get(
|
||||||
|
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,9 +145,7 @@ public class TestNameNodeMetrics extends TestCase {
|
||||||
fs.delete(file, true);
|
fs.delete(file, true);
|
||||||
filesTotal--; // reduce the filecount for deleted file
|
filesTotal--; // reduce the filecount for deleted file
|
||||||
|
|
||||||
// Wait for more than DATANODE_COUNT replication intervals to ensure all
|
waitForDeletion();
|
||||||
// the blocks pending deletion are sent for deletion to the datanodes.
|
|
||||||
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
|
|
||||||
updateMetrics();
|
updateMetrics();
|
||||||
rb = getMetrics(NS_METRICS);
|
rb = getMetrics(NS_METRICS);
|
||||||
assertGauge("FilesTotal", filesTotal, rb);
|
assertGauge("FilesTotal", filesTotal, rb);
|
||||||
|
@ -176,7 +174,7 @@ public class TestNameNodeMetrics extends TestCase {
|
||||||
assertGauge("PendingReplicationBlocks", 1L, rb);
|
assertGauge("PendingReplicationBlocks", 1L, rb);
|
||||||
assertGauge("ScheduledReplicationBlocks", 1L, rb);
|
assertGauge("ScheduledReplicationBlocks", 1L, rb);
|
||||||
fs.delete(file, true);
|
fs.delete(file, true);
|
||||||
updateMetrics();
|
waitForDeletion();
|
||||||
rb = getMetrics(NS_METRICS);
|
rb = getMetrics(NS_METRICS);
|
||||||
assertGauge("CorruptBlocks", 0L, rb);
|
assertGauge("CorruptBlocks", 0L, rb);
|
||||||
assertGauge("PendingReplicationBlocks", 0L, rb);
|
assertGauge("PendingReplicationBlocks", 0L, rb);
|
||||||
|
@ -212,9 +210,15 @@ public class TestNameNodeMetrics extends TestCase {
|
||||||
assertGauge("UnderReplicatedBlocks", 1L, rb);
|
assertGauge("UnderReplicatedBlocks", 1L, rb);
|
||||||
assertGauge("MissingBlocks", 1L, rb);
|
assertGauge("MissingBlocks", 1L, rb);
|
||||||
fs.delete(file, true);
|
fs.delete(file, true);
|
||||||
updateMetrics();
|
waitForDeletion();
|
||||||
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
|
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void waitForDeletion() throws InterruptedException {
|
||||||
|
// Wait for more than DATANODE_COUNT replication intervals to ensure all
|
||||||
|
// the blocks pending deletion are sent for deletion to the datanodes.
|
||||||
|
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
|
||||||
|
}
|
||||||
|
|
||||||
public void testRenameMetrics() throws Exception {
|
public void testRenameMetrics() throws Exception {
|
||||||
Path src = getTestPath("src");
|
Path src = getTestPath("src");
|
||||||
|
|
Loading…
Reference in New Issue