HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority Blocks. (Derek Dagit and Zhe Zhang via wang)

This commit is contained in:
Andrew Wang 2015-06-22 14:18:24 -07:00
parent c4747158b7
commit cd1e0930dc
6 changed files with 331 additions and 41 deletions

View File

@ -597,6 +597,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8337. Accessing httpfs via webhdfs doesn't work from a jar with
kerberos. (Yongjun Zhang)
HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority
Blocks for Lower-Priority Blocks (Derek Dagit via kihwal)
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1347,7 +1347,6 @@ public class BlockManager {
// abandoned block or block reopened for append
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
continue;
}
@ -1377,7 +1376,6 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
blockLog.info("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
continue;
@ -1434,7 +1432,6 @@ public class BlockManager {
if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = bc.getPreferredBlockReplication();
@ -1448,7 +1445,6 @@ public class BlockManager {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
blockLog.info("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@ -1481,7 +1477,6 @@ public class BlockManager {
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
neededReplications.decrementReplicationIndex(priority);
}
}
}

View File

@ -18,12 +18,9 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -82,11 +79,9 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
/** the queues themselves */
private final List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>();
private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>(LEVEL);
/** Stores the replication index for each priority */
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
/** The number of corrupt blocks with replication factor 1 */
private int corruptReplOneBlocks = 0;
@ -94,7 +89,6 @@ class UnderReplicatedBlocks implements Iterable<Block> {
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>());
priorityToReplIdx.put(i, 0);
}
}
@ -328,16 +322,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
}
}
}
/**
* Get a list of block lists to be replicated. The index of block lists
* represents its replication priority. Replication index will be tracked for
* each priority list separately in priorityToReplIdx map. Iterates through
* all priority lists and find the elements after replication index. Once the
* last priority lists reaches to end, all replication indexes will be set to
* 0 and start from 1st priority list to fulfill the blockToProces count.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
* represents its replication priority. Iterates each block list in priority
* order beginning with the highest priority list. Iterators use a bookmark to
* resume where the previous iteration stopped. Returns when the block count
* is met or iteration reaches the end of the lowest priority list, in which
* case bookmarks for each block list are reset to the heads of their
* respective lists.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated
* blocks.
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
@ -357,12 +353,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// Set the iterator to the first unprocessed block at this priority level.
neededReplicationsIterator.setToBookmark();
blocksToProcess = Math.min(blocksToProcess, size());
@ -375,20 +367,18 @@ class UnderReplicatedBlocks implements Iterable<Block> {
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
// Reset all priorities' bookmarks to the beginning because there were
// no recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
this.priorityQueues.get(i).resetBookmark();
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
@ -471,15 +461,19 @@ class UnderReplicatedBlocks implements Iterable<Block> {
int getPriority() {
return level;
}
}
/**
* This method is to decrement the replication index for the given priority
*
* @param priority - int priority level
*/
public void decrementReplicationIndex(int priority) {
Integer replIdx = priorityToReplIdx.get(priority);
priorityToReplIdx.put(priority, --replIdx);
/**
* Sets iterator(s) to bookmarked elements.
*/
private synchronized void setToBookmark() {
if (this.isIteratorForLevel) {
this.iterators.set(0, priorityQueues.get(this.level)
.getBookmark());
} else {
for (int i = 0; i < LEVEL; i++) {
this.iterators.set(i, priorityQueues.get(i).getBookmark());
}
}
}
}
}

View File

@ -56,6 +56,8 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
private DoubleLinkedElement<T> head;
private DoubleLinkedElement<T> tail;
private LinkedSetIterator bookmark;
/**
* @param initCapacity
* Recommended size of the internal array.
@ -69,6 +71,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
super(initCapacity, maxLoadFactor, minLoadFactor);
head = null;
tail = null;
bookmark = new LinkedSetIterator();
}
public LightWeightLinkedSet() {
@ -111,6 +114,12 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
tail = le;
if (head == null) {
head = le;
bookmark.next = head;
}
// Update bookmark, if necessary.
if (bookmark.next == null) {
bookmark.next = le;
}
return true;
}
@ -141,6 +150,11 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
if (tail == found) {
tail = tail.before;
}
// Update bookmark, if necessary.
if (found == this.bookmark.next) {
this.bookmark.next = found.after;
}
return found;
}
@ -262,5 +276,25 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
super.clear();
this.head = null;
this.tail = null;
this.resetBookmark();
}
/**
* Returns a new iterator starting at the bookmarked element.
*
* @return the iterator to the bookmarked element.
*/
public Iterator<T> getBookmark() {
LinkedSetIterator toRet = new LinkedSetIterator();
toRet.next = this.bookmark.next;
this.bookmark = toRet;
return toRet;
}
/**
* Resets the bookmark to the beginning of the list.
*/
public void resetBookmark() {
this.bookmark.next = this.head;
}
}

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -38,6 +39,7 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@ -1100,4 +1103,196 @@ public class TestReplicationPolicy {
exception.expect(IllegalArgumentException.class);
blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
}
@Test(timeout = 60000)
public void testUpdateDoesNotCauseSkippedReplication() {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
Block block3 = new Block(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
underReplicatedBlocks.add(block1, block1CurReplicas, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 6);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block was moved up a priority and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
}
@Test(timeout = 60000)
public void testAddStoredBlockDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
BlockManager bm =
new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
// Adding this block collection to the BlockManager, so that when we add the
// block under construction, the BlockManager will realize the expected
// replication has been achieved and remove it from the under-replicated
// queue.
BlockInfoUnderConstruction info = new BlockInfoUnderConstructionContiguous(block1, (short)1);
BlockCollection bc = mock(BlockCollection.class);
when(bc.getPreferredBlockReplication()).thenReturn((short)1);
bm.addBlockCollection(info, bc);
StatefulBlockInfo statefulBlockInfo = new StatefulBlockInfo(info,
block1, ReplicaState.RBW);
// Adding this block will increase its current replication, and that will
// remove it from the queue.
bm.addStoredBlockUnderConstruction(statefulBlockInfo,
TestReplicationPolicy.storages[0]);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void
testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
BlockManager bm =
new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfo info = new BlockInfoContiguous(block1, (short) 1);
final BlockCollection mbc = mock(BlockCollection.class);
when(mbc.getLastBlock()).thenReturn(info);
when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1);
when(mbc.getPreferredBlockReplication()).thenReturn((short)1);
ContentSummary cs = mock(ContentSummary.class);
when(cs.getLength()).thenReturn((long)1);
when(mbc.computeContentSummary(bm.getStoragePolicySuite())).thenReturn(cs);
info.setBlockCollection(mbc);
bm.addBlockCollection(info, mbc);
DatanodeStorageInfo[] dnAry = {storages[0]};
final BlockInfoUnderConstruction ucBlock =
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,
dnAry);
DatanodeStorageInfo storage = mock(DatanodeStorageInfo.class);
DatanodeDescriptor dn = mock(DatanodeDescriptor.class);
when(dn.isDecommissioned()).thenReturn(true);
when(storage.getState()).thenReturn(DatanodeStorage.State.NORMAL);
when(storage.getDatanodeDescriptor()).thenReturn(dn);
when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true);
when(storage.addBlock(any(BlockInfo.class))).thenReturn
(DatanodeStorageInfo.AddBlockResult.ADDED);
ucBlock.addStorage(storage);
when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any()))
.thenReturn(ucBlock);
bm.convertLastBlockToUnderConstruction(mbc, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@Test(timeout = 60000)
public void testupdateNeededReplicationsDoesNotCauseSkippedReplication()
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
BlockManager bm =
new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
bm.setReplication((short)0, (short)1, "", block1);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
}

View File

@ -325,10 +325,19 @@ public class TestLightWeightLinkedSet {
assertEquals(NUM, set.size());
assertFalse(set.isEmpty());
// Advance the bookmark.
Iterator<Integer> bkmrkIt = set.getBookmark();
for (int i=0; i<set.size()/2+1; i++) {
bkmrkIt.next();
}
assertTrue(bkmrkIt.hasNext());
// clear the set
set.clear();
assertEquals(0, set.size());
assertTrue(set.isEmpty());
bkmrkIt = set.getBookmark();
assertFalse(bkmrkIt.hasNext());
// poll should return an empty list
assertEquals(0, set.pollAll().size());
@ -363,4 +372,64 @@ public class TestLightWeightLinkedSet {
LOG.info("Test capacity - DONE");
}
@Test(timeout=60000)
public void testGetBookmarkReturnsBookmarkIterator() {
LOG.info("Test getBookmark returns proper iterator");
assertTrue(set.addAll(list));
Iterator<Integer> bookmark = set.getBookmark();
assertEquals(bookmark.next(), list.get(0));
final int numAdvance = list.size()/2;
for(int i=1; i<numAdvance; i++) {
bookmark.next();
}
Iterator<Integer> bookmark2 = set.getBookmark();
assertEquals(bookmark2.next(), list.get(numAdvance));
}
@Test(timeout=60000)
public void testBookmarkAdvancesOnRemoveOfSameElement() {
LOG.info("Test that the bookmark advances if we remove its element.");
assertTrue(set.add(list.get(0)));
assertTrue(set.add(list.get(1)));
assertTrue(set.add(list.get(2)));
Iterator<Integer> it = set.getBookmark();
assertEquals(it.next(), list.get(0));
set.remove(list.get(1));
it = set.getBookmark();
assertEquals(it.next(), list.get(2));
}
@Test(timeout=60000)
public void testBookmarkSetToHeadOnAddToEmpty() {
LOG.info("Test bookmark is set after adding to previously empty set.");
Iterator<Integer> it = set.getBookmark();
assertFalse(it.hasNext());
set.add(list.get(0));
set.add(list.get(1));
it = set.getBookmark();
assertTrue(it.hasNext());
assertEquals(it.next(), list.get(0));
assertEquals(it.next(), list.get(1));
assertFalse(it.hasNext());
}
@Test(timeout=60000)
public void testResetBookmarkPlacesBookmarkAtHead() {
set.addAll(list);
Iterator<Integer> it = set.getBookmark();
final int numAdvance = set.size()/2;
for (int i=0; i<numAdvance; i++) {
it.next();
}
assertEquals(it.next(), list.get(numAdvance));
set.resetBookmark();
it = set.getBookmark();
assertEquals(it.next(), list.get(0));
}
}