HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-10-15 18:07:09 +08:00
parent 76cfd833f3
commit 32c810c819
7 changed files with 147 additions and 198 deletions

View File

@ -704,6 +704,8 @@ Release 2.8.0 - UNRELEASED
HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei) HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)
HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -751,6 +751,7 @@ public class BlockManager implements BlockStatsMXBean {
// Remove block from replication queue. // Remove block from replication queue.
NumberReplicas replicas = countNodes(lastBlock); NumberReplicas replicas = countNodes(lastBlock);
neededReplications.remove(lastBlock, replicas.liveReplicas(), neededReplications.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock); pendingReplications.remove(lastBlock);
@ -1650,6 +1651,7 @@ public class BlockManager implements BlockStatsMXBean {
nodesContainingLiveReplicas.clear(); nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null; DatanodeDescriptor srcNode = null;
int live = 0; int live = 0;
int readonly = 0;
int decommissioned = 0; int decommissioned = 0;
int decommissioning = 0; int decommissioning = 0;
int corrupt = 0; int corrupt = 0;
@ -1673,6 +1675,9 @@ public class BlockManager implements BlockStatsMXBean {
nodesContainingLiveReplicas.add(storage); nodesContainingLiveReplicas.add(storage);
live += countableReplica; live += countableReplica;
} }
if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
}
containingNodes.add(node); containingNodes.add(node);
// Check if this replica is corrupt // Check if this replica is corrupt
// If so, do not select the node as src node // If so, do not select the node as src node
@ -1707,7 +1712,7 @@ public class BlockManager implements BlockStatsMXBean {
srcNode = node; srcNode = node;
} }
if(numReplicas != null) if(numReplicas != null)
numReplicas.initialize(live, decommissioned, decommissioning, corrupt, numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
excess, 0); excess, 0);
return srcNode; return srcNode;
} }
@ -1732,7 +1737,7 @@ public class BlockManager implements BlockStatsMXBean {
} }
NumberReplicas num = countNodes(timedOutItems[i]); NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, num.liveReplicas())) { if (isNeededReplication(bi, num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(), neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi)); num.decommissionedAndDecommissioning(), getReplication(bi));
} }
} }
@ -2614,6 +2619,7 @@ public class BlockManager implements BlockStatsMXBean {
short fileReplication = getExpectedReplicaNum(storedBlock); short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, numCurrentReplica)) { if (!isNeededReplication(storedBlock, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica, neededReplications.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication); num.decommissionedAndDecommissioning(), fileReplication);
} else { } else {
updateNeededReplications(storedBlock, curReplicaDelta, 0); updateNeededReplications(storedBlock, curReplicaDelta, 0);
@ -2846,8 +2852,8 @@ public class BlockManager implements BlockStatsMXBean {
int numCurrentReplica = num.liveReplicas(); int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be // add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) { if (isNeededReplication(block, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
.decommissionedAndDecommissioning(), expectedReplication)) { num.decommissionedAndDecommissioning(), expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED; return MisReplicationResult.UNDER_REPLICATED;
} }
} }
@ -3280,15 +3286,22 @@ public class BlockManager implements BlockStatsMXBean {
* Return the number of nodes hosting a given block, grouped * Return the number of nodes hosting a given block, grouped
* by the state of those replicas. * by the state of those replicas.
*/ */
public NumberReplicas countNodes(BlockInfo b) { public NumberReplicas countNodes(Block b) {
int decommissioned = 0; int decommissioned = 0;
int decommissioning = 0; int decommissioning = 0;
int live = 0; int live = 0;
int readonly = 0;
int corrupt = 0; int corrupt = 0;
int excess = 0; int excess = 0;
int stale = 0; int stale = 0;
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
if (storage.getState() == State.FAILED) {
continue;
} else if (storage.getState() == State.READ_ONLY_SHARED) {
readonly++;
continue;
}
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); final DatanodeDescriptor node = storage.getDatanodeDescriptor();
if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
corrupt++; corrupt++;
@ -3309,7 +3322,8 @@ public class BlockManager implements BlockStatsMXBean {
stale++; stale++;
} }
} }
return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale); return new NumberReplicas(live, readonly, decommissioned, decommissioning,
corrupt, excess, stale);
} }
/** /**
@ -3444,13 +3458,13 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas repl = countNodes(block); NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, repl.liveReplicas())) { if (isNeededReplication(block, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
.decommissionedAndDecommissioning(), curExpectedReplicas, repl.decommissionedAndDecommissioning(), curExpectedReplicas,
curReplicasDelta, expectedReplicasDelta); curReplicasDelta, expectedReplicasDelta);
} else { } else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas); repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
} }
} finally { } finally {
@ -3471,6 +3485,7 @@ public class BlockManager implements BlockStatsMXBean {
final int pending = pendingReplications.getNumReplicas(block); final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReplications.add(block, n.liveReplicas() + pending, neededReplications.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected); n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) { } else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null); processOverReplicatedBlock(block, expected, null, null);

View File

@ -573,7 +573,7 @@ public class DecommissionManager {
blockManager.isPopulatingReplQueues()) { blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode. // Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block, blockManager.neededReplications.add(block,
curReplicas, liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), num.decommissionedAndDecommissioning(),
block.getReplication()); block.getReplication());
} }

View File

@ -23,6 +23,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
*/ */
public class NumberReplicas { public class NumberReplicas {
private int liveReplicas; private int liveReplicas;
private int readOnlyReplicas;
// Tracks only the decommissioning replicas // Tracks only the decommissioning replicas
private int decommissioning; private int decommissioning;
@ -33,17 +34,18 @@ public class NumberReplicas {
private int replicasOnStaleNodes; private int replicasOnStaleNodes;
NumberReplicas() { NumberReplicas() {
initialize(0, 0, 0, 0, 0, 0); this(0, 0, 0, 0, 0, 0, 0);
} }
NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt, NumberReplicas(int live, int readonly, int decommissioned,
int excess, int stale) { int decommissioning, int corrupt, int excess, int stale) {
initialize(live, decommissioned, decommissioning, corrupt, excess, stale); set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale);
} }
void initialize(int live, int decommissioned, int decommissioning, void set(int live, int readonly, int decommissioned, int decommissioning,
int corrupt, int excess, int stale) { int corrupt, int excess, int stale) {
liveReplicas = live; liveReplicas = live;
readOnlyReplicas = readonly;
this.decommissioning = decommissioning; this.decommissioning = decommissioning;
this.decommissioned = decommissioned; this.decommissioned = decommissioned;
corruptReplicas = corrupt; corruptReplicas = corrupt;
@ -55,6 +57,10 @@ public class NumberReplicas {
return liveReplicas; return liveReplicas;
} }
public int readOnlyReplicas() {
return readOnlyReplicas;
}
/** /**
* *
* @return decommissioned replicas + decommissioning replicas * @return decommissioned replicas + decommissioning replicas

View File

@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
/** /**
* Keep prioritized queues of under replicated blocks. * Keep prioritized queues of under replicated blocks.
@ -34,7 +36,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
* *
* <p/> * <p/>
* The policy for choosing which priority to give added blocks * The policy for choosing which priority to give added blocks
<<<<<<< HEAD
* is implemented in {@link #getPriority(int, int, int)}. * is implemented in {@link #getPriority(int, int, int)}.
=======
* is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
>>>>>>> 5411dc5... HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)
* </p> * </p>
* <p>The queue order is as follows:</p> * <p>The queue order is as follows:</p>
* <ol> * <ol>
@ -146,6 +152,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/ */
private int getPriority(int curReplicas, private int getPriority(int curReplicas,
int readOnlyReplicas,
int decommissionedReplicas, int decommissionedReplicas,
int expectedReplicas) { int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!"; assert curReplicas >= 0 : "Negative replicas!";
@ -158,6 +165,11 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
if (decommissionedReplicas > 0) { if (decommissionedReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY; return QUEUE_HIGHEST_PRIORITY;
} }
if (readOnlyReplicas > 0) {
// only has read-only replicas, highest risk
// since the read-only replicas may go down all together.
return QUEUE_HIGHEST_PRIORITY;
}
//all we have are corrupt blocks //all we have are corrupt blocks
return QUEUE_WITH_CORRUPT_BLOCKS; return QUEUE_WITH_CORRUPT_BLOCKS;
} else if (curReplicas == 1) { } else if (curReplicas == 1) {
@ -183,11 +195,12 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
*/ */
synchronized boolean add(BlockInfo block, synchronized boolean add(BlockInfo block,
int curReplicas, int curReplicas,
int readOnlyReplicas,
int decomissionedReplicas, int decomissionedReplicas,
int expectedReplicas) { int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!"; assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(curReplicas, decomissionedReplicas, final int priLevel = getPriority(curReplicas, readOnlyReplicas,
expectedReplicas); decomissionedReplicas, expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) { if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) { expectedReplicas == 1) {
@ -207,11 +220,11 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
/** remove a block from a under replication queue */ /** remove a block from a under replication queue */
synchronized boolean remove(BlockInfo block, synchronized boolean remove(BlockInfo block,
int oldReplicas, int oldReplicas,
int oldReadOnlyReplicas,
int decommissionedReplicas, int decommissionedReplicas,
int oldExpectedReplicas) { int oldExpectedReplicas) {
int priLevel = getPriority(oldReplicas, final int priLevel = getPriority(oldReplicas, oldReadOnlyReplicas,
decommissionedReplicas, decommissionedReplicas, oldExpectedReplicas);
oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel); boolean removedBlock = remove(block, priLevel);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 && oldExpectedReplicas == 1 &&
@ -250,10 +263,10 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
// Try to remove the block from all queues if the block was // Try to remove the block from all queues if the block was
// not found in the queue for the given priority level. // not found in the queue for the given priority level.
for (int i = 0; i < LEVEL; i++) { for (int i = 0; i < LEVEL; i++) {
if (priorityQueues.get(i).remove(block)) { if (i != priLevel && priorityQueues.get(i).remove(block)) {
NameNode.blockStateChangeLog.debug( NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
" {} from priority queue {}", block, priLevel); " {} from priority queue {}", block, i);
return true; return true;
} }
} }
@ -278,15 +291,15 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
* @param expectedReplicasDelta the change in the expected replica count from before * @param expectedReplicasDelta the change in the expected replica count from before
*/ */
synchronized void update(BlockInfo block, int curReplicas, synchronized void update(BlockInfo block, int curReplicas,
int decommissionedReplicas, int readOnlyReplicas, int decommissionedReplicas,
int curExpectedReplicas, int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) { int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta; int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
int curPri = getPriority(curReplicas, decommissionedReplicas, int curPri = getPriority(curReplicas, readOnlyReplicas,
curExpectedReplicas); decommissionedReplicas, curExpectedReplicas);
int oldPri = getPriority(oldReplicas, decommissionedReplicas, int oldPri = getPriority(oldReplicas, readOnlyReplicas,
oldExpectedReplicas); decommissionedReplicas, oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
block + block +
@ -336,143 +349,69 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
* @return Return a list of block lists to be replicated. The block list index * @return Return a list of block lists to be replicated. The block list index
* represents its replication priority. * represents its replication priority.
*/ */
public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
int blocksToProcess) { int blocksToProcess) {
// initialize data structure for the return value final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<BlockInfo>());
}
if (size() == 0) { // There are no blocks to collect. int count = 0;
return blocksToReplicate; int priority = 0;
} for (; count < blocksToProcess && priority < LEVEL; priority++) {
if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
// do not choose corrupted blocks.
continue;
}
int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority. // Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
// Set the iterator to the first unprocessed block at this priority level. // Set the iterator to the first unprocessed block at this priority level.
neededReplicationsIterator.setToBookmark(); final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
blocksToProcess = Math.min(blocksToProcess, size()); blocksToReplicate.add(blocks);
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}
// Loop through all remaining blocks in the list. // Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess for(; count < blocksToProcess && i.hasNext(); count++) {
&& neededReplicationsIterator.hasNext()) { blocks.add(i.next());
BlockInfo block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// Reset all priorities' bookmarks to the beginning because there were
// no recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
this.priorityQueues.get(i).resetBookmark();
}
break;
} }
} }
if (priority == LEVEL) {
// Reset all bookmarks because there were no recently added blocks.
for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
q.resetBookmark();
}
}
return blocksToReplicate; return blocksToReplicate;
} }
/** returns an iterator of all blocks in a given priority queue */ /** returns an iterator of all blocks in a given priority queue */
synchronized BlockIterator iterator(int level) { synchronized Iterator<BlockInfo> iterator(int level) {
return new BlockIterator(level); return priorityQueues.get(level).iterator();
} }
/** return an iterator of all the under replication blocks */ /** return an iterator of all the under replication blocks */
@Override @Override
public synchronized BlockIterator iterator() { public synchronized Iterator<BlockInfo> iterator() {
return new BlockIterator(); final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
} return new Iterator<BlockInfo>() {
private Iterator<BlockInfo> b = q.next().iterator();
/** @Override
* An iterator over blocks. public BlockInfo next() {
*/ hasNext();
class BlockIterator implements Iterator<BlockInfo> { return b.next();
private int level;
private boolean isIteratorForLevel = false;
private final List<Iterator<BlockInfo>> iterators = new ArrayList<>();
/**
* Construct an iterator over all queues.
*/
private BlockIterator() {
level=0;
for(int i=0; i<LEVEL; i++) {
iterators.add(priorityQueues.get(i).iterator());
} }
}
/** @Override
* Constrict an iterator for a single queue level public boolean hasNext() {
* @param l the priority level to iterate over for(; !b.hasNext() && q.hasNext(); ) {
*/ b = q.next().iterator();
private BlockIterator(int l) {
level = l;
isIteratorForLevel = true;
iterators.add(priorityQueues.get(level).iterator());
}
private void update() {
if (isIteratorForLevel) {
return;
}
while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
level++;
}
}
@Override
public BlockInfo next() {
if (isIteratorForLevel) {
return iterators.get(0).next();
}
update();
return iterators.get(level).next();
}
@Override
public boolean hasNext() {
if (isIteratorForLevel) {
return iterators.get(0).hasNext();
}
update();
return iterators.get(level).hasNext();
}
@Override
public void remove() {
if (isIteratorForLevel) {
iterators.get(0).remove();
} else {
iterators.get(level).remove();
}
}
int getPriority() {
return level;
}
/**
* Sets iterator(s) to bookmarked elements.
*/
private synchronized void setToBookmark() {
if (this.isIteratorForLevel) {
this.iterators.set(0, priorityQueues.get(this.level)
.getBookmark());
} else {
for (int i = 0; i < LEVEL; i++) {
this.iterators.set(i, priorityQueues.get(i).getBookmark());
} }
return b.hasNext();
} }
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
} }
} }

View File

@ -836,7 +836,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding the blocks directly to normal priority // Adding the blocks directly to normal priority
neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 3); nextLong()), 2, 0, 0, 3);
} }
// Lets wait for the replication interval, to start process normal // Lets wait for the replication interval, to start process normal
// priority blocks // priority blocks
@ -844,7 +844,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding the block directly to high priority list // Adding the block directly to high priority list
neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3); nextLong()), 1, 0, 0, 3);
// Lets wait for the replication interval // Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
@ -868,23 +868,23 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block // Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3); nextLong()), 1, 0, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block // Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 7); nextLong()), 2, 0, 0, 7);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 6, 0, 6); nextLong()), 6, 0, 0, 6);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 5, 0, 6); nextLong()), 5, 0, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block // Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 0, 3); nextLong()), 0, 0, 0, 3);
} }
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
@ -902,13 +902,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding QUEUE_HIGHEST_PRIORITY // Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 3); nextLong()), 0, 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
// and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10); chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5); assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);
// Since it is reached to end of all lists, // Since it is reached to end of all lists,
// should start picking the blocks from start. // should start picking the blocks from start.
@ -920,29 +919,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
/** asserts the chosen blocks with expected priority blocks */ /** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks( private void assertTheChosenBlocks(
List<List<BlockInfo>> chosenBlocks, int firstPrioritySize, List<List<BlockInfo>> chosenBlocks, int... expectedSizes) {
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, int i = 0;
int fifthPrioritySize) { for(; i < chosenBlocks.size(); i++) {
assertEquals( assertEquals("Not returned the expected number for i=" + i,
"Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks", expectedSizes[i], chosenBlocks.get(i).size());
firstPrioritySize, chosenBlocks.get( }
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size()); for(; i < expectedSizes.length; i++) {
assertEquals( assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]);
"Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks", }
secondPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
thirdPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
fourthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
assertEquals(
"Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
fifthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
} }
/** /**
@ -1101,14 +1086,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Adding QUEUE_VERY_UNDER_REPLICATED block // Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2; final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7; final int block1ExpectedReplicas = 7;
underReplicatedBlocks.add(block1, block1CurReplicas, 0, underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0,
block1ExpectedReplicas); block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block // Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 2, 0, 7); underReplicatedBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 6); underReplicatedBlocks.add(block3, 2, 0, 0, 6);
List<List<BlockInfo>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
@ -1119,7 +1104,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Increasing the replications will move the block down a // Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks. // priority. This simulates a replica being completed in between checks.
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0,
block1ExpectedReplicas, 1, 0); block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
@ -1147,10 +1132,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
@ -1196,10 +1181,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
@ -1258,10 +1243,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Iterator;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.junit.Test; import org.junit.Test;
@ -53,7 +55,7 @@ public class TestUnderReplicatedBlockQueues {
assertEquals(1, queues.size()); assertEquals(1, queues.size());
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY); assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail //repeated additions fail
assertFalse(queues.add(block1, 1, 0, 3)); assertFalse(queues.add(block1, 1, 0, 0, 3));
//add a second block with two replicas //add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3); assertAdded(queues, block2, 2, 0, 3);
@ -77,11 +79,11 @@ public class TestUnderReplicatedBlockQueues {
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1); assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
assertEquals(2, queues.getCorruptBlockSize()); assertEquals(2, queues.getCorruptBlockSize());
assertEquals(1, queues.getCorruptReplOneBlockSize()); assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2); queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
assertEquals(0, queues.getCorruptReplOneBlockSize()); assertEquals(0, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt, 0, 0, 1, 0, -2); queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize()); assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_very_under_replicated, 0, 0, 1, -4, -24); queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize()); assertEquals(2, queues.getCorruptReplOneBlockSize());
} }
@ -92,7 +94,7 @@ public class TestUnderReplicatedBlockQueues {
int expectedReplicas) { int expectedReplicas) {
assertTrue("Failed to add " + block, assertTrue("Failed to add " + block,
queues.add(block, queues.add(block,
curReplicas, curReplicas, 0,
decomissionedReplicas, decomissionedReplicas,
expectedReplicas)); expectedReplicas));
} }
@ -110,7 +112,7 @@ public class TestUnderReplicatedBlockQueues {
private void assertInLevel(UnderReplicatedBlocks queues, private void assertInLevel(UnderReplicatedBlocks queues,
Block block, Block block,
int level) { int level) {
UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level); final Iterator<BlockInfo> bi = queues.iterator(level);
while (bi.hasNext()) { while (bi.hasNext()) {
Block next = bi.next(); Block next = bi.next();
if (block.equals(next)) { if (block.equals(next)) {