From ccdb978603696a91c4828a66aad27f585543b376 Mon Sep 17 00:00:00 2001 From: Kihwal Lee Date: Mon, 5 Aug 2013 20:48:59 +0000 Subject: [PATCH] HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority Blocks for Lower-Priority Blocks. Contributed by Derek Dagit. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1510724 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 5 - .../UnderReplicatedBlocks.java | 63 +++--- .../hdfs/util/LightWeightLinkedSet.java | 34 +++ .../TestReplicationPolicy.java | 193 +++++++++++++++++- .../hdfs/util/TestLightWeightLinkedSet.java | 69 +++++++ 6 files changed, 325 insertions(+), 42 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9da190021fb..49b2bd2db53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -262,6 +262,9 @@ Release 2.3.0 - UNRELEASED HDFS-5035. getFileLinkStatus and rename do not correctly check permissions of symlinks. (Andrew Wang via Colin Patrick McCabe) + HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority + Blocks for Lower-Priority Blocks (Derek Dagit via kihwal) + Release 2.1.1-beta - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 7f3e2842957..512a052ddb5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1208,7 +1208,6 @@ public class BlockManager { // abandoned block or block reopened for append if(bc == null || bc instanceof MutableBlockCollection) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); continue; } @@ -1235,7 +1234,6 @@ public class BlockManager { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); blockLog.info("BLOCK* Removing " + block + " from neededReplications as it has enough replicas"); continue; @@ -1295,7 +1293,6 @@ public class BlockManager { if(bc == null || bc instanceof MutableBlockCollection) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; - neededReplications.decrementReplicationIndex(priority); continue; } requiredReplication = bc.getBlockReplication(); @@ -1309,7 +1306,6 @@ public class BlockManager { if ( (pendingReplications.getNumReplicas(block) > 0) || (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); rw.targets = null; blockLog.info("BLOCK* Removing " + block + " from neededReplications as it has enough replicas"); @@ -1346,7 +1342,6 @@ public class BlockManager { // remove from neededReplications if(numEffectiveReplicas + targets.length >= requiredReplication) { neededReplications.remove(block, priority); // remove from neededReplications - neededReplications.decrementReplicationIndex(priority); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 779b445cece..555cec40fd3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -18,11 +18,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; - import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -82,16 +79,12 @@ class UnderReplicatedBlocks implements Iterable { static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; /** the queues themselves */ private List> priorityQueues - = new ArrayList>(); + = new ArrayList>(LEVEL); - /** Stores the replication index for each priority */ - private Map priorityToReplIdx = new HashMap(LEVEL); - /** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { priorityQueues.add(new LightWeightLinkedSet()); - priorityToReplIdx.put(i, 0); } } @@ -310,13 +303,15 @@ class UnderReplicatedBlocks implements Iterable { /** * Get a list of block lists to be replicated. The index of block lists - * represents its replication priority. Replication index will be tracked for - * each priority list separately in priorityToReplIdx map. Iterates through - * all priority lists and find the elements after replication index. Once the - * last priority lists reaches to end, all replication indexes will be set to - * 0 and start from 1st priority list to fulfill the blockToProces count. - * - * @param blocksToProcess - number of blocks to fetch from underReplicated blocks. + * represents its replication priority. Iterates each block list in priority + * order beginning with the highest priority list. Iterators use a bookmark to + * resume where the previous iteration stopped. Returns when the block count + * is met or iteration reaches the end of the lowest priority list, in which + * case bookmarks for each block list are reset to the heads of their + * respective lists. + * + * @param blocksToProcess - number of blocks to fetch from underReplicated + * blocks. * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ @@ -336,12 +331,8 @@ class UnderReplicatedBlocks implements Iterable { for (int priority = 0; priority < LEVEL; priority++) { // Go through all blocks that need replications with current priority. BlockIterator neededReplicationsIterator = iterator(priority); - Integer replIndex = priorityToReplIdx.get(priority); - - // skip to the first unprocessed block, which is at replIndex - for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) { - neededReplicationsIterator.next(); - } + // Set the iterator to the first unprocessed block at this priority level. + neededReplicationsIterator.setToBookmark(); blocksToProcess = Math.min(blocksToProcess, size()); @@ -354,20 +345,18 @@ class UnderReplicatedBlocks implements Iterable { && neededReplicationsIterator.hasNext()) { Block block = neededReplicationsIterator.next(); blocksToReplicate.get(priority).add(block); - replIndex++; blockCount++; } if (!neededReplicationsIterator.hasNext() && neededReplicationsIterator.getPriority() == LEVEL - 1) { - // reset all priorities replication index to 0 because there is no - // recently added blocks in any list. + // Reset all priorities' bookmarks to the beginning because there were + // no recently added blocks in any list. for (int i = 0; i < LEVEL; i++) { - priorityToReplIdx.put(i, 0); + this.priorityQueues.get(i).resetBookmark(); } break; } - priorityToReplIdx.put(priority, replIndex); } return blocksToReplicate; } @@ -450,15 +439,19 @@ class UnderReplicatedBlocks implements Iterable { int getPriority() { return level; } - } - /** - * This method is to decrement the replication index for the given priority - * - * @param priority - int priority level - */ - public void decrementReplicationIndex(int priority) { - Integer replIdx = priorityToReplIdx.get(priority); - priorityToReplIdx.put(priority, --replIdx); + /** + * Sets iterator(s) to bookmarked elements. + */ + private synchronized void setToBookmark() { + if (this.isIteratorForLevel) { + this.iterators.set(0, priorityQueues.get(this.level) + .getBookmark()); + } else { + for(int i=0; i extends LightWeightHashSet { private DoubleLinkedElement head; private DoubleLinkedElement tail; + private LinkedSetIterator bookmark; + /** * @param initCapacity * Recommended size of the internal array. @@ -69,6 +71,7 @@ public class LightWeightLinkedSet extends LightWeightHashSet { super(initCapacity, maxLoadFactor, minLoadFactor); head = null; tail = null; + bookmark = new LinkedSetIterator(); } public LightWeightLinkedSet() { @@ -111,6 +114,12 @@ public class LightWeightLinkedSet extends LightWeightHashSet { tail = le; if (head == null) { head = le; + bookmark.next = head; + } + + // Update bookmark, if necessary. + if (bookmark.next == null) { + bookmark.next = le; } return true; } @@ -141,6 +150,11 @@ public class LightWeightLinkedSet extends LightWeightHashSet { if (tail == found) { tail = tail.before; } + + // Update bookmark, if necessary. + if (found == this.bookmark.next) { + this.bookmark.next = found.after; + } return found; } @@ -262,5 +276,25 @@ public class LightWeightLinkedSet extends LightWeightHashSet { super.clear(); this.head = null; this.tail = null; + this.resetBookmark(); + } + + /** + * Returns a new iterator starting at the bookmarked element. + * + * @return the iterator to the bookmarked element. + */ + public Iterator getBookmark() { + LinkedSetIterator toRet = new LinkedSetIterator(); + toRet.next = this.bookmark.next; + this.bookmark = toRet; + return toRet; + } + + /** + * Resets the bookmark to the beginning of the list. + */ + public void resetBookmark() { + this.bookmark.next = this.head; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 0322eefad78..ba6c3737266 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -21,8 +21,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; - +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,6 +34,7 @@ import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; @@ -40,9 +44,13 @@ import org.apache.hadoop.hdfs.LogVerificationAppender; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.Namesystem; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Time; @@ -1004,4 +1012,185 @@ public class TestReplicationPolicy { exception.expect(IllegalArgumentException.class); blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); } -} + + @Test(timeout = 60000) + public void testUpdateDoesNotCauseSkippedReplication() { + UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); + + Block block1 = new Block(random.nextLong()); + Block block2 = new Block(random.nextLong()); + Block block3 = new Block(random.nextLong()); + + // Adding QUEUE_VERY_UNDER_REPLICATED block + final int block1CurReplicas = 2; + final int block1ExpectedReplicas = 7; + underReplicatedBlocks.add(block1, block1CurReplicas, 0, + block1ExpectedReplicas); + + // Adding QUEUE_VERY_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 2, 0, 7); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block3, 2, 0, 6); + + List> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0); + + // Increasing the replications will move the block down a + // priority. This simulates a replica being completed in between checks. + underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, + block1ExpectedReplicas, 1, 0); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block was moved up a priority and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0); + } + + @Test(timeout = 60000) + public void testAddStoredBlockDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + when(mockNS.hasWriteLock()).thenReturn(true); + FSClusterStats mockStats = mock(FSClusterStats.class); + BlockManager bm = + new BlockManager(mockNS, mockStats, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(random.nextLong()); + Block block2 = new Block(random.nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + // Adding this block collection to the BlockManager, so that when we add the + // block under construction, the BlockManager will realize the expected + // replication has been achieved and remove it from the under-replicated + // queue. + BlockInfoUnderConstruction info = new BlockInfoUnderConstruction(block1, 1); + BlockCollection bc = mock(BlockCollection.class); + when(bc.getBlockReplication()).thenReturn((short)1); + bm.addBlockCollection(info, bc); + + // Adding this block will increase its current replication, and that will + // remove it from the queue. + bm.addStoredBlockUnderConstruction(info, + TestReplicationPolicy.dataNodes[0], ReplicaState.FINALIZED); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } + + @Test(timeout = 60000) + public void + testConvertLastBlockToUnderConstructionDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + FSClusterStats mockStats = mock(FSClusterStats.class); + BlockManager bm = + new BlockManager(mockNS, mockStats, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(random.nextLong()); + Block block2 = new Block(random.nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + final BlockInfo info = new BlockInfo(block1, 1); + final MutableBlockCollection mbc = mock(MutableBlockCollection.class); + when(mbc.getLastBlock()).thenReturn(info); + when(mbc.getPreferredBlockSize()).thenReturn(block1.getNumBytes() + 1); + when(mbc.getBlockReplication()).thenReturn((short)1); + ContentSummary cs = mock(ContentSummary.class); + when(cs.getLength()).thenReturn((long)1); + when(mbc.computeContentSummary()).thenReturn(cs); + info.setBlockCollection(mbc); + bm.addBlockCollection(info, mbc); + + DatanodeDescriptor[] dnAry = {dataNodes[0]}; + final BlockInfoUnderConstruction ucBlock = + info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, + dnAry); + when(mbc.setLastBlock((BlockInfo) any(), (DatanodeDescriptor[]) any())) + .thenReturn(ucBlock); + + bm.convertLastBlockToUnderConstruction(mbc); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } + + @Test(timeout = 60000) + public void testupdateNeededReplicationsDoesNotCauseSkippedReplication() + throws IOException { + Namesystem mockNS = mock(Namesystem.class); + when(mockNS.isPopulatingReplQueues()).thenReturn(true); + FSClusterStats mockStats = mock(FSClusterStats.class); + BlockManager bm = + new BlockManager(mockNS, mockStats, new HdfsConfiguration()); + UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; + + Block block1 = new Block(random.nextLong()); + Block block2 = new Block(random.nextLong()); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block1, 0, 1, 1); + + // Adding QUEUE_UNDER_REPLICATED block + underReplicatedBlocks.add(block2, 0, 1, 1); + + List> chosenBlocks; + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + + bm.setReplication((short)0, (short)1, "", block1); + + // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block + // from QUEUE_VERY_UNDER_REPLICATED. + // This block remains and should not be skipped over. + chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java index 1ccbccf53f5..2d306532596 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java @@ -325,10 +325,19 @@ public class TestLightWeightLinkedSet { assertEquals(NUM, set.size()); assertFalse(set.isEmpty()); + // Advance the bookmark. + Iterator bkmrkIt = set.getBookmark(); + for (int i=0; i bookmark = set.getBookmark(); + assertEquals(bookmark.next(), list.get(0)); + + final int numAdvance = list.size()/2; + for(int i=1; i bookmark2 = set.getBookmark(); + assertEquals(bookmark2.next(), list.get(numAdvance)); + } + + @Test(timeout=60000) + public void testBookmarkAdvancesOnRemoveOfSameElement() { + LOG.info("Test that the bookmark advances if we remove its element."); + assertTrue(set.add(list.get(0))); + assertTrue(set.add(list.get(1))); + assertTrue(set.add(list.get(2))); + + Iterator it = set.getBookmark(); + assertEquals(it.next(), list.get(0)); + set.remove(list.get(1)); + it = set.getBookmark(); + assertEquals(it.next(), list.get(2)); + } + + @Test(timeout=60000) + public void testBookmarkSetToHeadOnAddToEmpty() { + LOG.info("Test bookmark is set after adding to previously empty set."); + Iterator it = set.getBookmark(); + assertFalse(it.hasNext()); + set.add(list.get(0)); + set.add(list.get(1)); + + it = set.getBookmark(); + assertTrue(it.hasNext()); + assertEquals(it.next(), list.get(0)); + assertEquals(it.next(), list.get(1)); + assertFalse(it.hasNext()); + } + + @Test(timeout=60000) + public void testResetBookmarkPlacesBookmarkAtHead() { + set.addAll(list); + Iterator it = set.getBookmark(); + final int numAdvance = set.size()/2; + for (int i=0; i