diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java index f6ef248e79f..d719e937f2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -500,6 +501,8 @@ class LowRedundancyBlocks implements Iterable { * the block count is met or iteration reaches the end of the lowest priority * list, in which case bookmarks for each block list are reset to the heads * of their respective lists. + * If a block is deleted (has invalid bcId), it will be removed from the low + * redundancy queues. * * @param blocksToProcess - number of blocks to fetch from low redundancy * blocks. @@ -515,21 +518,32 @@ class LowRedundancyBlocks implements Iterable { int count = 0; int priority = 0; + HashSet toRemove = new HashSet<>(); for (; count < blocksToProcess && priority < LEVEL; priority++) { - if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { - // do not choose corrupted blocks. - continue; - } - // Go through all blocks that need reconstructions with current priority. // Set the iterator to the first unprocessed block at this priority level + // We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need + // to look for deleted blocks if any. + final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority); final Iterator i = priorityQueues.get(priority).getBookmark(); final List blocks = new LinkedList<>(); - blocksToReconstruct.add(blocks); - // Loop through all remaining blocks in the list. - for(; count < blocksToProcess && i.hasNext(); count++) { - blocks.add(i.next()); + if (!inCorruptLevel) { + blocksToReconstruct.add(blocks); } + for(; count < blocksToProcess && i.hasNext(); count++) { + BlockInfo block = i.next(); + if (block.isDeleted()) { + toRemove.add(block); + continue; + } + if (!inCorruptLevel) { + blocks.add(block); + } + } + for (BlockInfo bInfo : toRemove) { + remove(bInfo, priority); + } + toRemove.clear(); } if (priority == LEVEL || resetIterators) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java index e63a8d8dccb..ef614fb362d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; @@ -41,6 +42,7 @@ import static org.junit.Assert.fail; public class TestLowRedundancyBlockQueues { private final ErasureCodingPolicy ecPolicy; + private static AtomicLong mockINodeId = new AtomicLong(0); public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) { ecPolicy = policy; @@ -52,7 +54,15 @@ public class TestLowRedundancyBlockQueues { } private BlockInfo genBlockInfo(long id) { - return new BlockInfoContiguous(new Block(id), (short) 3); + return genBlockInfo(id, false); + } + + private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) { + BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3); + if (!isCorruptBlock) { + bInfo.setBlockCollectionId(mockINodeId.incrementAndGet()); + } + return bInfo; } private BlockInfo genStripedBlockInfo(long id, long numBytes) { @@ -93,6 +103,41 @@ public class TestLowRedundancyBlockQueues { queues.getHighestPriorityECBlockCount()); } + /** + * Tests that deleted blocks should not be returned by + * {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}. + * @throws Exception + */ + @Test + public void testDeletedBlocks() throws Exception { + int numBlocks = 5; + LowRedundancyBlocks queues = new LowRedundancyBlocks(); + // create 5 blockinfos. The first one is corrupt. + for (int ind = 0; ind < numBlocks; ind++) { + BlockInfo blockInfo = genBlockInfo(ind, ind == 0); + queues.add(blockInfo, 2, 0, 0, 3); + } + List> blocks; + // Get two blocks from the queue, but we should only get one because first + // block is deleted. + blocks = queues.chooseLowRedundancyBlocks(2, false); + + assertEquals(1, blocks.get(2).size()); + assertEquals(1, blocks.get(2).get(0).getBlockId()); + + // Get the next blocks - should be ID 2 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(2, blocks.get(2).get(0).getBlockId()); + + // Get the next block, but also reset this time - should be ID 3 returned + blocks = queues.chooseLowRedundancyBlocks(1, true); + assertEquals(3, blocks.get(2).get(0).getBlockId()); + + // Get one more block and due to resetting the queue it will be block id 1 + blocks = queues.chooseLowRedundancyBlocks(1, false); + assertEquals(1, blocks.get(2).get(0).getBlockId()); + } + @Test public void testQueuePositionCanBeReset() throws Throwable { LowRedundancyBlocks queues = new LowRedundancyBlocks(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 6936e44723e..efe4172340d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.AddBlockFlag; @@ -81,7 +82,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // The interval for marking a datanode as stale, private static final long staleInterval = DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT; - + private static AtomicLong mockINodeId = new AtomicLong(0); @Rule public ExpectedException exception = ExpectedException.none(); @@ -824,7 +825,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { } private BlockInfo genBlockInfo(long id) { - return new BlockInfoContiguous(new Block(id), (short) 3); + return genBlockInfo(id, false); + } + + private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) { + BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3); + if (!isBlockCorrupted) { + bInfo.setBlockCollectionId(mockINodeId.incrementAndGet()); + } + return bInfo; } /** @@ -847,7 +856,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the blocks directly to normal priority neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 2, 0, 0, 3); + nextLong(), true), 2, 0, 0, 3); } // Lets wait for the replication interval, to start process normal // priority blocks @@ -855,7 +864,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the block directly to high priority list neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 1, 0, 0, 3); + nextLong(), true), 1, 0, 0, 3); // Lets wait for the replication interval Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);