HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority Blocks. Contributed by Derek Dagit.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510724 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2013-08-05 20:48:59 +00:00
parent c4fb095d4a
commit ccdb978603
6 changed files with 325 additions and 42 deletions

View File

@ -262,6 +262,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5035. getFileLinkStatus and rename do not correctly check permissions
of symlinks. (Andrew Wang via Colin Patrick McCabe)
HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority
Blocks for Lower-Priority Blocks (Derek Dagit via kihwal)
Release 2.1.1-beta - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1208,7 +1208,6 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// abandoned block or block reopened for append
if(bc == null || bc instanceof MutableBlockCollection) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
continue;
}
@ -1235,7 +1234,6 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
blockLog.info("BLOCK* Removing " + block
+ " from neededReplications as it has enough replicas");
continue;
@ -1295,7 +1293,6 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if(bc == null || bc instanceof MutableBlockCollection) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = bc.getBlockReplication();
@ -1309,7 +1306,6 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
blockLog.info("BLOCK* Removing " + block
+ " from neededReplications as it has enough replicas");
@ -1346,7 +1342,6 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
}
}
}

View File

@ -18,11 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -82,16 +79,12 @@ class UnderReplicatedBlocks implements Iterable<Block> {
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
/** the queues themselves */
private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>();
= new ArrayList<LightWeightLinkedSet<Block>>(LEVEL);
/** Stores the replication index for each priority */
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
/** Create an object. */
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>());
priorityToReplIdx.put(i, 0);
}
}
@ -310,13 +303,15 @@ synchronized void update(Block block, int curReplicas,
/**
* Get a list of block lists to be replicated. The index of block lists
* represents its replication priority. Replication index will be tracked for
* each priority list separately in priorityToReplIdx map. Iterates through
* all priority lists and find the elements after replication index. Once the
* last priority lists reaches to end, all replication indexes will be set to
* 0 and start from 1st priority list to fulfill the blockToProces count.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
* represents its replication priority. Iterates each block list in priority
* order beginning with the highest priority list. Iterators use a bookmark to
* resume where the previous iteration stopped. Returns when the block count
* is met or iteration reaches the end of the lowest priority list, in which
* case bookmarks for each block list are reset to the heads of their
* respective lists.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated
* blocks.
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
@ -336,12 +331,8 @@ public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// Set the iterator to the first unprocessed block at this priority level.
neededReplicationsIterator.setToBookmark();
blocksToProcess = Math.min(blocksToProcess, size());
@ -354,20 +345,18 @@ public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
// Reset all priorities' bookmarks to the beginning because there were
// no recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
this.priorityQueues.get(i).resetBookmark();
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
@ -450,15 +439,19 @@ public void remove() {
int getPriority() {
return level;
}
}
/**
* This method is to decrement the replication index for the given priority
*
* @param priority - int priority level
*/
public void decrementReplicationIndex(int priority) {
Integer replIdx = priorityToReplIdx.get(priority);
priorityToReplIdx.put(priority, --replIdx);
/**
* Sets iterator(s) to bookmarked elements.
*/
private synchronized void setToBookmark() {
if (this.isIteratorForLevel) {
this.iterators.set(0, priorityQueues.get(this.level)
.getBookmark());
} else {
for(int i=0; i<LEVEL; i++) {
this.iterators.set(i, priorityQueues.get(i).getBookmark());
}
}
}
}
}

View File

@ -56,6 +56,8 @@ public String toString() {
private DoubleLinkedElement<T> head;
private DoubleLinkedElement<T> tail;
private LinkedSetIterator bookmark;
/**
* @param initCapacity
* Recommended size of the internal array.
@ -69,6 +71,7 @@ public LightWeightLinkedSet(int initCapacity, float maxLoadFactor,
super(initCapacity, maxLoadFactor, minLoadFactor);
head = null;
tail = null;
bookmark = new LinkedSetIterator();
}
public LightWeightLinkedSet() {
@ -111,6 +114,12 @@ protected boolean addElem(final T element) {
tail = le;
if (head == null) {
head = le;
bookmark.next = head;
}
// Update bookmark, if necessary.
if (bookmark.next == null) {
bookmark.next = le;
}
return true;
}
@ -141,6 +150,11 @@ protected DoubleLinkedElement<T> removeElem(final T key) {
if (tail == found) {
tail = tail.before;
}
// Update bookmark, if necessary.
if (found == this.bookmark.next) {
this.bookmark.next = found.after;
}
return found;
}
@ -262,5 +276,25 @@ public void clear() {
super.clear();
this.head = null;
this.tail = null;
this.resetBookmark();
}
/**
* Returns a new iterator starting at the bookmarked element.
*
* @return the iterator to the bookmarked element.
*/
public Iterator<T> getBookmark() {
LinkedSetIterator toRet = new LinkedSetIterator();
toRet.next = this.bookmark.next;
this.bookmark = toRet;
return toRet;
}
/**
* Resets the bookmark to the beginning of the list.
*/
public void resetBookmark() {
this.bookmark.next = this.head;
}
}

View File

@ -21,8 +21,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -31,6 +34,7 @@
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
@ -40,9 +44,13 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Time;
@ -1004,4 +1012,185 @@ public void testGetReplWorkMultiplier() {
exception.expect(IllegalArgumentException.class);
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
}
}
@Test(timeout = 60000)
public void testUpdateDoesNotCauseSkippedReplication() {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
Block block1 = new Block(random.nextLong());
Block block2 = new Block(random.nextLong());
Block block3 = new Block(random.nextLong());
// Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
underReplicatedBlocks.add(block1, block1CurReplicas, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 6);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block was moved up a priority and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
}
@Test(timeout = 60000)
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
Block block2 = new Block(random.nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
// Adding this block collection to the BlockManager, so that when we add the
// block under construction, the BlockManager will realize the expected
// replication has been achieved and remove it from the under-replicated
// queue.
BlockInfoUnderConstruction info = new BlockInfoUnderConstruction(block1, 1);
BlockCollection bc = mock(BlockCollection.class);
when(bc.getBlockReplication()).thenReturn((short)1);
bm.addBlockCollection(info, bc);
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(info,
TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
Block block2 = new Block(random.nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfo info = new BlockInfo(block1, 1);
final MutableBlockCollection mbc = mock(MutableBlockCollection.class);
when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.getBlockReplication()).thenReturn((short)1);
ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1);
when(mbc.computeContentSummary()).thenReturn(cs);
info.setBlockCollection(mbc);
bm.addBlockCollection(info, mbc);
DatanodeDescriptor[] dnAry = {dataNodes[0]};
final BlockInfoUnderConstruction ucBlock =
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
dnAry);
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any()))
.thenReturn(ucBlock);
bm.convertLastBlockToUnderConstruction(mbc);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
FSClusterStats mockStats = mock(FSClusterStats.class);
BlockManager bm =
new BlockManager(mockNS, mockStats, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
Block block2 = new Block(random.nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
bm.setReplication((short)0, (short)1, "", block1);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
}

View File

@ -325,10 +325,19 @@ public void testClear() {
assertEquals(NUM, set.size());
assertFalse(set.isEmpty());
// Advance the bookmark.
Iterator<Integer> bkmrkIt = set.getBookmark();
for (int i=0; i<set.size()/2+1; i++) {
bkmrkIt.next();
}
assertTrue(bkmrkIt.hasNext());
// clear the set
set.clear();
assertEquals(0, set.size());
assertTrue(set.isEmpty());
bkmrkIt = set.getBookmark();
assertFalse(bkmrkIt.hasNext());
// poll should return an empty list
assertEquals(0, set.pollAll().size());
@ -363,4 +372,64 @@ public void testOther() {
LOG.info("Test capacity - DONE");
}
@Test(timeout=60000)
public void testGetBookmarkReturnsBookmarkIterator() {
LOG.info("Test getBookmark returns proper iterator");
assertTrue(set.addAll(list));
Iterator<Integer> bookmark = set.getBookmark();
assertEquals(bookmark.next(), list.get(0));
final int numAdvance = list.size()/2;
for(int i=1; i<numAdvance; i++) {
bookmark.next();
}
Iterator<Integer> bookmark2 = set.getBookmark();
assertEquals(bookmark2.next(), list.get(numAdvance));
}
@Test(timeout=60000)
public void testBookmarkAdvancesOnRemoveOfSameElement() {
LOG.info("Test that the bookmark advances if we remove its element.");
assertTrue(set.add(list.get(0)));
assertTrue(set.add(list.get(1)));
assertTrue(set.add(list.get(2)));
Iterator<Integer> it = set.getBookmark();
assertEquals(it.next(), list.get(0));
set.remove(list.get(1));
it = set.getBookmark();
assertEquals(it.next(), list.get(2));
}
@Test(timeout=60000)
public void testBookmarkSetToHeadOnAddToEmpty() {
LOG.info("Test bookmark is set after adding to previously empty set.");
Iterator<Integer> it = set.getBookmark();
assertFalse(it.hasNext());
set.add(list.get(0));
set.add(list.get(1));
it = set.getBookmark();
assertTrue(it.hasNext());
assertEquals(it.next(), list.get(0));
assertEquals(it.next(), list.get(1));
assertFalse(it.hasNext());
}
@Test(timeout=60000)
public void testResetBookmarkPlacesBookmarkAtHead() {
set.addAll(list);
Iterator<Integer> it = set.getBookmark();
final int numAdvance = set.size()/2;
for (int i=0; i<numAdvance; i++) {
it.next();
}
assertEquals(it.next(), list.get(numAdvance));
set.resetBookmark();
it = set.getBookmark();
assertEquals(it.next(), list.get(0));
}
}