HDFS-15622. Deleted blocks linger in the replications queue. Contributed by Ahmed Hussein.
(cherry picked from commit da1b6e3cc2
)
This commit is contained in:
parent
33db7c140b
commit
7a3085d552
|
@ -18,6 +18,7 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -500,6 +501,8 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
|
||||||
* the block count is met or iteration reaches the end of the lowest priority
|
* the block count is met or iteration reaches the end of the lowest priority
|
||||||
* list, in which case bookmarks for each block list are reset to the heads
|
* list, in which case bookmarks for each block list are reset to the heads
|
||||||
* of their respective lists.
|
* of their respective lists.
|
||||||
|
* If a block is deleted (has invalid bcId), it will be removed from the low
|
||||||
|
* redundancy queues.
|
||||||
*
|
*
|
||||||
* @param blocksToProcess - number of blocks to fetch from low redundancy
|
* @param blocksToProcess - number of blocks to fetch from low redundancy
|
||||||
* blocks.
|
* blocks.
|
||||||
|
@ -515,21 +518,32 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
int priority = 0;
|
int priority = 0;
|
||||||
|
HashSet<BlockInfo> toRemove = new HashSet<>();
|
||||||
for (; count < blocksToProcess && priority < LEVEL; priority++) {
|
for (; count < blocksToProcess && priority < LEVEL; priority++) {
|
||||||
if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
|
|
||||||
// do not choose corrupted blocks.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Go through all blocks that need reconstructions with current priority.
|
// Go through all blocks that need reconstructions with current priority.
|
||||||
// Set the iterator to the first unprocessed block at this priority level
|
// Set the iterator to the first unprocessed block at this priority level
|
||||||
|
// We do not want to skip QUEUE_WITH_CORRUPT_BLOCKS because we still need
|
||||||
|
// to look for deleted blocks if any.
|
||||||
|
final boolean inCorruptLevel = (QUEUE_WITH_CORRUPT_BLOCKS == priority);
|
||||||
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
|
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
|
||||||
final List<BlockInfo> blocks = new LinkedList<>();
|
final List<BlockInfo> blocks = new LinkedList<>();
|
||||||
|
if (!inCorruptLevel) {
|
||||||
blocksToReconstruct.add(blocks);
|
blocksToReconstruct.add(blocks);
|
||||||
// Loop through all remaining blocks in the list.
|
|
||||||
for(; count < blocksToProcess && i.hasNext(); count++) {
|
|
||||||
blocks.add(i.next());
|
|
||||||
}
|
}
|
||||||
|
for(; count < blocksToProcess && i.hasNext(); count++) {
|
||||||
|
BlockInfo block = i.next();
|
||||||
|
if (block.isDeleted()) {
|
||||||
|
toRemove.add(block);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!inCorruptLevel) {
|
||||||
|
blocks.add(block);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (BlockInfo bInfo : toRemove) {
|
||||||
|
remove(bInfo, priority);
|
||||||
|
}
|
||||||
|
toRemove.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (priority == LEVEL || resetIterators) {
|
if (priority == LEVEL || resetIterators) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
import org.apache.hadoop.hdfs.StripedFileTestUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -41,6 +42,7 @@ import static org.junit.Assert.fail;
|
||||||
public class TestLowRedundancyBlockQueues {
|
public class TestLowRedundancyBlockQueues {
|
||||||
|
|
||||||
private final ErasureCodingPolicy ecPolicy;
|
private final ErasureCodingPolicy ecPolicy;
|
||||||
|
private static AtomicLong mockINodeId = new AtomicLong(0);
|
||||||
|
|
||||||
public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
|
public TestLowRedundancyBlockQueues(ErasureCodingPolicy policy) {
|
||||||
ecPolicy = policy;
|
ecPolicy = policy;
|
||||||
|
@ -52,7 +54,15 @@ public class TestLowRedundancyBlockQueues {
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockInfo genBlockInfo(long id) {
|
private BlockInfo genBlockInfo(long id) {
|
||||||
return new BlockInfoContiguous(new Block(id), (short) 3);
|
return genBlockInfo(id, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockInfo genBlockInfo(long id, boolean isCorruptBlock) {
|
||||||
|
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
|
||||||
|
if (!isCorruptBlock) {
|
||||||
|
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
|
||||||
|
}
|
||||||
|
return bInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockInfo genStripedBlockInfo(long id, long numBytes) {
|
private BlockInfo genStripedBlockInfo(long id, long numBytes) {
|
||||||
|
@ -93,6 +103,41 @@ public class TestLowRedundancyBlockQueues {
|
||||||
queues.getHighestPriorityECBlockCount());
|
queues.getHighestPriorityECBlockCount());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that deleted blocks should not be returned by
|
||||||
|
* {@link LowRedundancyBlocks#chooseLowRedundancyBlocks(int, boolean)}.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDeletedBlocks() throws Exception {
|
||||||
|
int numBlocks = 5;
|
||||||
|
LowRedundancyBlocks queues = new LowRedundancyBlocks();
|
||||||
|
// create 5 blockinfos. The first one is corrupt.
|
||||||
|
for (int ind = 0; ind < numBlocks; ind++) {
|
||||||
|
BlockInfo blockInfo = genBlockInfo(ind, ind == 0);
|
||||||
|
queues.add(blockInfo, 2, 0, 0, 3);
|
||||||
|
}
|
||||||
|
List<List<BlockInfo>> blocks;
|
||||||
|
// Get two blocks from the queue, but we should only get one because first
|
||||||
|
// block is deleted.
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(2, false);
|
||||||
|
|
||||||
|
assertEquals(1, blocks.get(2).size());
|
||||||
|
assertEquals(1, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get the next blocks - should be ID 2
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, false);
|
||||||
|
assertEquals(2, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get the next block, but also reset this time - should be ID 3 returned
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, true);
|
||||||
|
assertEquals(3, blocks.get(2).get(0).getBlockId());
|
||||||
|
|
||||||
|
// Get one more block and due to resetting the queue it will be block id 1
|
||||||
|
blocks = queues.chooseLowRedundancyBlocks(1, false);
|
||||||
|
assertEquals(1, blocks.get(2).get(0).getBlockId());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueuePositionCanBeReset() throws Throwable {
|
public void testQueuePositionCanBeReset() throws Throwable {
|
||||||
LowRedundancyBlocks queues = new LowRedundancyBlocks();
|
LowRedundancyBlocks queues = new LowRedundancyBlocks();
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.AddBlockFlag;
|
import org.apache.hadoop.hdfs.AddBlockFlag;
|
||||||
|
@ -81,7 +82,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
// The interval for marking a datanode as stale,
|
// The interval for marking a datanode as stale,
|
||||||
private static final long staleInterval =
|
private static final long staleInterval =
|
||||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
|
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT;
|
||||||
|
private static AtomicLong mockINodeId = new AtomicLong(0);
|
||||||
@Rule
|
@Rule
|
||||||
public ExpectedException exception = ExpectedException.none();
|
public ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
|
@ -824,7 +825,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlockInfo genBlockInfo(long id) {
|
private BlockInfo genBlockInfo(long id) {
|
||||||
return new BlockInfoContiguous(new Block(id), (short) 3);
|
return genBlockInfo(id, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private BlockInfo genBlockInfo(long id, boolean isBlockCorrupted) {
|
||||||
|
BlockInfo bInfo = new BlockInfoContiguous(new Block(id), (short) 3);
|
||||||
|
if (!isBlockCorrupted) {
|
||||||
|
bInfo.setBlockCollectionId(mockINodeId.incrementAndGet());
|
||||||
|
}
|
||||||
|
return bInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -847,7 +856,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
// Adding the blocks directly to normal priority
|
// Adding the blocks directly to normal priority
|
||||||
|
|
||||||
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
|
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
nextLong()), 2, 0, 0, 3);
|
nextLong(), true), 2, 0, 0, 3);
|
||||||
}
|
}
|
||||||
// Lets wait for the replication interval, to start process normal
|
// Lets wait for the replication interval, to start process normal
|
||||||
// priority blocks
|
// priority blocks
|
||||||
|
@ -855,7 +864,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
|
||||||
|
|
||||||
// Adding the block directly to high priority list
|
// Adding the block directly to high priority list
|
||||||
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
|
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
|
||||||
nextLong()), 1, 0, 0, 3);
|
nextLong(), true), 1, 0, 0, 3);
|
||||||
|
|
||||||
// Lets wait for the replication interval
|
// Lets wait for the replication interval
|
||||||
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
Loading…
Reference in New Issue