HDFS-9857. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-1]. Contributed by Rakesh R.

This commit is contained in:
Zhe Zhang 2016-03-16 16:53:58 -07:00
parent 605fdcbb81
commit 32d043d9c5
9 changed files with 351 additions and 333 deletions

View File

@ -149,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean {
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
private volatile long lowRedundancyBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
/** flag indicating whether replication queues have been initialized */
@ -166,7 +166,7 @@ public class BlockManager implements BlockStatsMXBean {
}
/** Used by metrics */
public long getUnderReplicatedBlocksCount() {
return underReplicatedBlocksCount;
return lowRedundancyBlocksCount;
}
/** Used by metrics */
public long getCorruptReplicaBlocksCount() {
@ -250,9 +250,10 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Store set of Blocks that need to be replicated 1 or more times.
* We also store pending replication-orders.
* We also store pending reconstruction-orders.
*/
public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
public final LowRedundancyBlocks neededReconstruction =
new LowRedundancyBlocks();
@VisibleForTesting
final PendingReplicationBlocks pendingReplications;
@ -294,20 +295,20 @@ public class BlockManager implements BlockStatsMXBean {
private boolean shouldPostponeBlocksFromFuture = false;
/**
* Process replication queues asynchronously to allow namenode safemode exit
* and failover to be faster. HDFS-5496
* Process reconstruction queues asynchronously to allow namenode safemode
* exit and failover to be faster. HDFS-5496.
*/
private Daemon replicationQueuesInitializer = null;
private Daemon reconstructionQueuesInitializer = null;
/**
* Number of blocks to process asychronously for replication queues
* Number of blocks to process asychronously for reconstruction queues
* initialization once aquired the namesystem lock. Remaining blocks will be
* processed again after aquiring lock again.
*/
private int numBlocksPerIteration;
/**
* Progress of the Replication queues initialisation.
* Progress of the Reconstruction queues initialisation.
*/
private double replicationQueuesInitProgress = 0.0;
private double reconstructionQueuesInitProgress = 0.0;
/** for block replicas placement */
private BlockPlacementPolicies placementPolicies;
@ -576,12 +577,12 @@ public class BlockManager implements BlockStatsMXBean {
out.println("Live Datanodes: " + live.size());
out.println("Dead Datanodes: " + dead.size());
//
// Dump contents of neededReplication
// Dump contents of neededReconstruction
//
synchronized (neededReplications) {
out.println("Metasave: Blocks waiting for replication: " +
neededReplications.size());
for (Block block : neededReplications) {
synchronized (neededReconstruction) {
out.println("Metasave: Blocks waiting for reconstruction: "
+ neededReconstruction.size());
for (Block block : neededReconstruction) {
dumpBlockMeta(block, out);
}
}
@ -616,7 +617,7 @@ public class BlockManager implements BlockStatsMXBean {
// source node returned is not used
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas,
new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL);
new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
@ -849,9 +850,9 @@ public class BlockManager implements BlockStatsMXBean {
// is happening
bc.convertLastBlockToUC(lastBlock, targets);
// Remove block from replication queue.
// Remove block from reconstruction queue.
NumberReplicas replicas = countNodes(lastBlock);
neededReplications.remove(lastBlock, replicas.liveReplicas(),
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
replicas.decommissionedAndDecommissioning(), getReplication(lastBlock));
pendingReplications.remove(lastBlock);
@ -1365,8 +1366,8 @@ public class BlockManager implements BlockStatsMXBean {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node, numberOfReplicas);
} else if (isPopulatingReplQueues()) {
// add the block to neededReplication
updateNeededReplications(b.getStored(), -1, 0);
// add the block to neededReconstruction
updateNeededReconstructions(b.getStored(), -1, 0);
}
}
@ -1418,13 +1419,13 @@ public class BlockManager implements BlockStatsMXBean {
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
lowRedundancyBlocksCount = neededReconstruction.size();
corruptReplicaBlocksCount = corruptReplicas.size();
}
/** Return number of under-replicated but not missing blocks */
/** Return number of low redundancy blocks but not missing blocks. */
public int getUnderReplicatedNotMissingBlocks() {
return neededReplications.getUnderReplicatedBlockCount();
return neededReconstruction.getLowRedundancyBlockCount();
}
/**
@ -1452,25 +1453,26 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* Scan blocks in {@link #neededReplications} and assign reconstruction
* Scan blocks in {@link #neededReconstruction} and assign reconstruction
* (replication or erasure coding) work to data-nodes they belong to.
*
* The number of process blocks equals either twice the number of live
* data-nodes or the number of under-replicated blocks whichever is less.
* data-nodes or the number of low redundancy blocks whichever is less.
*
* @return number of blocks scheduled for replication during this iteration.
* @return number of blocks scheduled for reconstruction during this
* iteration.
*/
int computeBlockReconstructionWork(int blocksToProcess) {
List<List<BlockInfo>> blocksToReplicate = null;
List<List<BlockInfo>> blocksToReconstruct = null;
namesystem.writeLock();
try {
// Choose the blocks to be replicated
blocksToReplicate = neededReplications
.chooseUnderReplicatedBlocks(blocksToProcess);
// Choose the blocks to be reconstructed
blocksToReconstruct = neededReconstruction
.chooseLowRedundancyBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
return computeReconstructionWorkForBlocks(blocksToReplicate);
return computeReconstructionWorkForBlocks(blocksToReconstruct);
}
/**
@ -1489,7 +1491,7 @@ public class BlockManager implements BlockStatsMXBean {
// Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
try {
synchronized (neededReplications) {
synchronized (neededReconstruction) {
for (int priority = 0; priority < blocksToReconstruct
.size(); priority++) {
for (BlockInfo block : blocksToReconstruct.get(priority)) {
@ -1533,7 +1535,7 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
synchronized (neededReplications) {
synchronized (neededReconstruction) {
if (validateReconstructionWork(rw)) {
scheduledWork++;
}
@ -1544,7 +1546,7 @@ public class BlockManager implements BlockStatsMXBean {
}
if (blockLog.isDebugEnabled()) {
// log which blocks have been scheduled for replication
// log which blocks have been scheduled for reconstruction
for(BlockReconstructionWork rw : reconWork){
DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
@ -1558,8 +1560,9 @@ public class BlockManager implements BlockStatsMXBean {
}
}
blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}",
neededReplications.size(), pendingReplications.size());
blockLog.debug(
"BLOCK* neededReconstruction = {} pendingReplications = {}",
neededReconstruction.size(), pendingReplications.size());
}
return scheduledWork;
@ -1576,8 +1579,8 @@ public class BlockManager implements BlockStatsMXBean {
int priority) {
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
// remove from neededReplications
neededReplications.remove(block, priority);
// remove from neededReconstruction
neededReconstruction.remove(block, priority);
return null;
}
@ -1605,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean {
int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReplications.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
neededReconstruction.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
" it has enough replicas", block);
return null;
}
@ -1662,7 +1665,7 @@ public class BlockManager implements BlockStatsMXBean {
// Recheck since global lock was released
// skip abandoned block or block reopened for append
if (block.isDeleted() || !block.isCompleteOrCommitted()) {
neededReplications.remove(block, priority);
neededReconstruction.remove(block, priority);
rw.resetTargets();
return false;
}
@ -1673,7 +1676,7 @@ public class BlockManager implements BlockStatsMXBean {
final int pendingNum = pendingReplications.getNumReplicas(block);
if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
requiredReplication)) {
neededReplications.remove(block, priority);
neededReconstruction.remove(block, priority);
rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
" it has enough replicas", block);
@ -1705,9 +1708,9 @@ public class BlockManager implements BlockStatsMXBean {
+ "pendingReplications", block);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum;
// remove from neededReplications
// remove from neededReconstruction
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority);
neededReconstruction.remove(block, priority);
}
return true;
}
@ -1852,7 +1855,7 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
&& !node.isDecommissionInProgress()
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit
@ -1905,9 +1908,10 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
NumberReplicas num = countNodes(timedOutItems[i]);
if (isNeededReplication(bi, num.liveReplicas())) {
neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), getReplication(bi));
if (isNeededReconstruction(bi, num.liveReplicas())) {
neededReconstruction.add(bi, num.liveReplicas(),
num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
getReplication(bi));
}
}
} finally {
@ -2777,7 +2781,7 @@ public class BlockManager implements BlockStatsMXBean {
* intended for use with initial block report at startup. If not in startup
* safe mode, will call standard addStoredBlock(). Assumes this method is
* called "immediately" so there is no need to refresh the storedBlock from
* blocksMap. Doesn't handle underReplication/overReplication, or worry about
* blocksMap. Doesn't handle low redundancy/extra redundancy, or worry about
* pendingReplications or corruptReplicas, because it's in startup safe mode.
* Doesn't log every block, because there are typically millions of them.
*
@ -2812,7 +2816,7 @@ public class BlockManager implements BlockStatsMXBean {
/**
* Modify (block-->datanode) map. Remove block from set of
* needed replications if this takes care of the problem.
* needed reconstruction if this takes care of the problem.
* @return the block that is stored in blocksMap.
*/
private Block addStoredBlock(final BlockInfo block,
@ -2890,24 +2894,25 @@ public class BlockManager implements BlockStatsMXBean {
return storedBlock;
}
// do not try to handle over/under-replicated blocks during first safe mode
// do not try to handle extra/low redundancy blocks during first safe mode
if (!isPopulatingReplQueues()) {
return storedBlock;
}
// handle underReplication/overReplication
// handle low redundancy/extra redundancy
short fileReplication = getExpectedReplicaNum(storedBlock);
if (!isNeededReplication(storedBlock, numCurrentReplica)) {
neededReplications.remove(storedBlock, numCurrentReplica,
if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
neededReconstruction.remove(storedBlock, numCurrentReplica,
num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), fileReplication);
} else {
updateNeededReplications(storedBlock, curReplicaDelta, 0);
updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
}
if (shouldProcessOverReplicated(num, fileReplication)) {
processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint);
if (shouldProcessExtraRedundancy(num, fileReplication)) {
processExtraRedundancyBlock(storedBlock, fileReplication, node,
delNodeHint);
}
// If the file replication has reached desired value
// If the file redundancy has reached desired value
// we can remove any corrupt replicas the block may have
int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock);
int numCorruptNodes = num.corruptReplicas();
@ -2922,7 +2927,7 @@ public class BlockManager implements BlockStatsMXBean {
return storedBlock;
}
private boolean shouldProcessOverReplicated(NumberReplicas num,
private boolean shouldProcessExtraRedundancy(NumberReplicas num,
int expectedNum) {
final int numCurrent = num.liveReplicas();
return numCurrent > expectedNum ||
@ -2972,42 +2977,44 @@ public class BlockManager implements BlockStatsMXBean {
/**
* For each block in the name-node verify whether it belongs to any file,
* over or under replicated. Place it into the respective queue.
* extra or low redundancy. Place it into the respective queue.
*/
public void processMisReplicatedBlocks() {
assert namesystem.hasWriteLock();
stopReplicationInitializer();
neededReplications.clear();
replicationQueuesInitializer = new Daemon() {
stopReconstructionInitializer();
neededReconstruction.clear();
reconstructionQueuesInitializer = new Daemon() {
@Override
public void run() {
try {
processMisReplicatesAsync();
} catch (InterruptedException ie) {
LOG.info("Interrupted while processing replication queues.");
LOG.info("Interrupted while processing reconstruction queues.");
} catch (Exception e) {
LOG.error("Error while processing replication queues async", e);
LOG.error("Error while processing reconstruction queues async", e);
}
}
};
replicationQueuesInitializer.setName("Replication Queue Initializer");
replicationQueuesInitializer.start();
reconstructionQueuesInitializer
.setName("Reconstruction Queue Initializer");
reconstructionQueuesInitializer.start();
}
/*
* Stop the ongoing initialisation of replication queues
* Stop the ongoing initialisation of reconstruction queues
*/
private void stopReplicationInitializer() {
if (replicationQueuesInitializer != null) {
replicationQueuesInitializer.interrupt();
private void stopReconstructionInitializer() {
if (reconstructionQueuesInitializer != null) {
reconstructionQueuesInitializer.interrupt();
try {
replicationQueuesInitializer.join();
reconstructionQueuesInitializer.join();
} catch (final InterruptedException e) {
LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning..");
LOG.warn("Interrupted while waiting for "
+ "reconstructionQueueInitializer. Returning..");
return;
} finally {
replicationQueuesInitializer = null;
reconstructionQueuesInitializer = null;
}
}
}
@ -3025,7 +3032,7 @@ public class BlockManager implements BlockStatsMXBean {
long startTimeMisReplicatedScan = Time.monotonicNow();
Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator();
long totalBlocks = blocksMap.size();
replicationQueuesInitProgress = 0;
reconstructionQueuesInitProgress = 0;
long totalProcessed = 0;
long sleepDuration =
Math.max(1, Math.min(numBlocksPerIteration/1000, 10000));
@ -3067,7 +3074,7 @@ public class BlockManager implements BlockStatsMXBean {
totalProcessed += processed;
// there is a possibility that if any of the blocks deleted/added during
// initialisation, then progress might be different.
replicationQueuesInitProgress = Math.min((double) totalProcessed
reconstructionQueuesInitProgress = Math.min((double) totalProcessed
/ totalBlocks, 1.0);
if (!blocksItr.hasNext()) {
@ -3097,12 +3104,12 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* Get the progress of the Replication queues initialisation
* Get the progress of the reconstruction queues initialisation
*
* @return Returns values between 0 and 1 for the progress.
*/
public double getReplicationQueuesInitProgress() {
return replicationQueuesInitProgress;
public double getReconstructionQueuesInitProgress() {
return reconstructionQueuesInitProgress;
}
/**
@ -3134,15 +3141,16 @@ public class BlockManager implements BlockStatsMXBean {
short expectedReplication = getExpectedReplicaNum(block);
NumberReplicas num = countNodes(block);
final int numCurrentReplica = num.liveReplicas();
// add to under-replicated queue if need to be
if (isNeededReplication(block, numCurrentReplica)) {
if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(), expectedReplication)) {
// add to low redundancy queue if need to be
if (isNeededReconstruction(block, numCurrentReplica)) {
if (neededReconstruction.add(block, numCurrentReplica,
num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
expectedReplication)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
if (shouldProcessOverReplicated(num, expectedReplication)) {
if (shouldProcessExtraRedundancy(num, expectedReplication)) {
if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have
@ -3152,8 +3160,8 @@ public class BlockManager implements BlockStatsMXBean {
return MisReplicationResult.POSTPONE;
}
// over-replicated block
processOverReplicatedBlock(block, expectedReplication, null, null);
// extra redundancy block
processExtraRedundancyBlock(block, expectedReplication, null, null);
return MisReplicationResult.OVER_REPLICATED;
}
@ -3167,12 +3175,12 @@ public class BlockManager implements BlockStatsMXBean {
return;
}
// update needReplication priority queues
// update neededReconstruction priority queues
b.setReplication(newRepl);
updateNeededReplications(b, 0, newRepl - oldRepl);
updateNeededReconstructions(b, 0, newRepl - oldRepl);
if (oldRepl > newRepl) {
processOverReplicatedBlock(b, newRepl, null, null);
processExtraRedundancyBlock(b, newRepl, null, null);
}
}
@ -3181,7 +3189,7 @@ public class BlockManager implements BlockStatsMXBean {
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
private void processOverReplicatedBlock(final BlockInfo block,
private void processExtraRedundancyBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
@ -3405,7 +3413,7 @@ public class BlockManager implements BlockStatsMXBean {
//
if (!storedBlock.isDeleted()) {
bmSafeMode.decrementSafeBlockCount(storedBlock);
updateNeededReplications(storedBlock, -1, 0);
updateNeededReconstructions(storedBlock, -1, 0);
}
excessReplicas.remove(node, storedBlock);
@ -3748,29 +3756,29 @@ public class BlockManager implements BlockStatsMXBean {
/**
* On stopping decommission, check if the node has excess replicas.
* If there are any excess replicas, call processOverReplicatedBlock().
* Process over replicated blocks only when active NN is out of safe mode.
* If there are any excess replicas, call processExtraRedundancyBlock().
* Process extra redundancy blocks only when active NN is out of safe mode.
*/
void processOverReplicatedBlocksOnReCommission(
void processExtraRedundancyBlocksOnReCommission(
final DatanodeDescriptor srcNode) {
if (!isPopulatingReplQueues()) {
return;
}
final Iterator<BlockInfo> it = srcNode.getBlockIterator();
int numOverReplicated = 0;
int numExtraRedundancy = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
int expectedReplication = this.getReplication(block);
NumberReplicas num = countNodes(block);
if (shouldProcessOverReplicated(num, expectedReplication)) {
// over-replicated block
processOverReplicatedBlock(block, (short) expectedReplication, null,
if (shouldProcessExtraRedundancy(num, expectedReplication)) {
// extra redundancy block
processExtraRedundancyBlock(block, (short) expectedReplication, null,
null);
numOverReplicated++;
numExtraRedundancy++;
}
}
LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " +
srcNode + " during recommissioning");
LOG.info("Invalidated " + numExtraRedundancy
+ " extra redundancy blocks on " + srcNode + " during recommissioning");
}
/**
@ -3789,9 +3797,9 @@ public class BlockManager implements BlockStatsMXBean {
updateState();
if (pendingReplicationBlocksCount == 0 &&
underReplicatedBlocksCount == 0) {
LOG.info("Node {} is dead and there are no under-replicated" +
" blocks or blocks pending replication. Safe to decommission.",
lowRedundancyBlocksCount == 0) {
LOG.info("Node {} is dead and there are no low redundancy" +
" blocks or blocks pending reconstruction. Safe to decommission.",
node);
return true;
}
@ -3835,9 +3843,9 @@ public class BlockManager implements BlockStatsMXBean {
block.setNumBytes(BlockCommand.NO_ACK);
addToInvalidates(block);
removeBlockFromMap(block);
// Remove the block from pendingReplications and neededReplications
// Remove the block from pendingReplications and neededReconstruction
pendingReplications.remove(block);
neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
if (postponedMisreplicatedBlocks.remove(block)) {
postponedMisreplicatedBlocksCount.decrementAndGet();
}
@ -3859,8 +3867,8 @@ public class BlockManager implements BlockStatsMXBean {
new Block(BlockIdManager.convertToStripedID(block.getBlockId())));
}
/** updates a block in under replication queue */
private void updateNeededReplications(final BlockInfo block,
/** updates a block in needed reconstruction queue. */
private void updateNeededReconstructions(final BlockInfo block,
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
@ -3869,14 +3877,14 @@ public class BlockManager implements BlockStatsMXBean {
}
NumberReplicas repl = countNodes(block);
int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, repl.liveReplicas())) {
neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), curExpectedReplicas,
curReplicasDelta, expectedReplicasDelta);
if (isNeededReconstruction(block, repl.liveReplicas())) {
neededReconstruction.update(block, repl.liveReplicas(),
repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas()-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
}
} finally {
@ -3885,10 +3893,10 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* Check replication of the blocks in the collection.
* If any block is needed replication, insert it into the replication queue.
* Check sufficient redundancy of the blocks in the collection. If any block
* is needed reconstruction, insert it into the reconstruction queue.
* Otherwise, if the block is more than the expected replication factor,
* process it as an over replicated block.
* process it as an extra redundancy block.
*/
public void checkReplication(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) {
@ -3896,11 +3904,11 @@ public class BlockManager implements BlockStatsMXBean {
final NumberReplicas n = countNodes(block);
final int pending = pendingReplications.getNumReplicas(block);
if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReplications.add(block, n.liveReplicas() + pending,
neededReconstruction.add(block, n.liveReplicas() + pending,
n.readOnlyReplicas(),
n.decommissionedAndDecommissioning(), expected);
} else if (shouldProcessOverReplicated(n, expected)) {
processOverReplicatedBlock(block, expected, null, null);
} else if (shouldProcessExtraRedundancy(n, expected)) {
processExtraRedundancyBlock(block, expected, null, null);
}
}
}
@ -3926,7 +3934,7 @@ public class BlockManager implements BlockStatsMXBean {
try {
// blocks should not be replicated or removed if safe mode is on
if (namesystem.isInSafeMode()) {
LOG.debug("In safemode, not computing replication work");
LOG.debug("In safemode, not computing reconstruction work");
return 0;
}
try {
@ -3980,10 +3988,10 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
* A block needs replication if the number of replicas is less than expected
* or if it does not have enough racks.
* A block needs reconstruction if the number of replicas is less than
* expected or if it does not have enough racks.
*/
boolean isNeededReplication(BlockInfo storedBlock, int current) {
boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
int expected = getExpectedReplicaNum(storedBlock);
return storedBlock.isComplete()
&& (current < expected || !isPlacementPolicySatisfied(storedBlock));
@ -3997,12 +4005,12 @@ public class BlockManager implements BlockStatsMXBean {
public long getMissingBlocksCount() {
// not locking
return this.neededReplications.getCorruptBlockSize();
return this.neededReconstruction.getCorruptBlockSize();
}
public long getMissingReplOneBlocksCount() {
// not locking
return this.neededReplications.getCorruptReplOneBlockSize();
return this.neededReconstruction.getCorruptReplOneBlockSize();
}
public BlockInfo addBlockCollection(BlockInfo block,
@ -4050,8 +4058,8 @@ public class BlockManager implements BlockStatsMXBean {
* Return an iterator over the set of blocks for which there are no replicas.
*/
public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
return neededReplications.iterator(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
return neededReconstruction.iterator(
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
/**
@ -4070,7 +4078,7 @@ public class BlockManager implements BlockStatsMXBean {
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();
return neededReconstruction.size();
}
/**
@ -4232,7 +4240,7 @@ public class BlockManager implements BlockStatsMXBean {
* this NameNode.
*/
public void clearQueues() {
neededReplications.clear();
neededReconstruction.clear();
pendingReplications.clear();
excessReplicas.clear();
invalidateBlocks.clear();
@ -4298,7 +4306,7 @@ public class BlockManager implements BlockStatsMXBean {
}
public void shutdown() {
stopReplicationInitializer();
stopReconstructionInitializer();
blocksMap.close();
MBeans.unregister(mxBeanName);
mxBeanName = null;

View File

@ -215,10 +215,10 @@ public class DecommissionManager {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
blockManager.processExtraRedundancyBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
@ -513,9 +513,9 @@ public class DecommissionManager {
final List<BlockInfo> insufficientList,
boolean pruneReliableBlocks) {
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0;
int lowRedundancyBlocks = 0;
int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0;
int lowRedundancyInOpenFiles = 0;
while (it.hasNext()) {
numBlocksChecked++;
final BlockInfo block = it.next();
@ -537,22 +537,22 @@ public class DecommissionManager {
final NumberReplicas num = blockManager.countNodes(block);
final int liveReplicas = num.liveReplicas();
// Schedule under-replicated blocks for replication if not already
// Schedule low redundancy blocks for reconstruction if not already
// pending
if (blockManager.isNeededReplication(block, liveReplicas)) {
if (!blockManager.neededReplications.contains(block) &&
if (blockManager.isNeededReconstruction(block, liveReplicas)) {
if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReplications.getNumReplicas(block) == 0 &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReplications.add(block,
blockManager.neededReconstruction.add(block,
liveReplicas, num.readOnlyReplicas(),
num.decommissionedAndDecommissioning(),
blockManager.getExpectedReplicaNum(block));
}
}
// Even if the block is under-replicated,
// it doesn't block decommission if it's sufficiently replicated
// Even if the block is without sufficient redundancy,
// it doesn't block decommission if has sufficient redundancy
if (isSufficient(block, bc, num)) {
if (pruneReliableBlocks) {
it.remove();
@ -560,7 +560,7 @@ public class DecommissionManager {
continue;
}
// We've found an insufficiently replicated block.
// We've found a block without sufficient redundancy.
if (insufficientList != null) {
insufficientList.add(block);
}
@ -571,18 +571,18 @@ public class DecommissionManager {
firstReplicationLog = false;
}
// Update various counts
underReplicatedBlocks++;
lowRedundancyBlocks++;
if (bc.isUnderConstruction()) {
underReplicatedInOpenFiles++;
lowRedundancyInOpenFiles++;
}
if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
decommissionOnlyReplicas++;
}
}
datanode.decommissioningStatus.set(underReplicatedBlocks,
datanode.decommissioningStatus.set(lowRedundancyBlocks,
decommissionOnlyReplicas,
underReplicatedInOpenFiles);
lowRedundancyInOpenFiles);
}
}

View File

@ -26,9 +26,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
/**
* Keep prioritized queues of under replicated blocks.
* Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
* indicating the highest priority.
* Keep prioritized queues of low redundant blocks.
* Blocks have redundancy priority, with priority
* {@link #QUEUE_HIGHEST_PRIORITY} indicating the highest priority.
* </p>
* Having a prioritised queue allows the {@link BlockManager} to select
* which blocks to replicate first -it tries to give priority to data
@ -40,19 +40,19 @@ import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
* </p>
* <p>The queue order is as follows:</p>
* <ol>
* <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
* <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that should be redundant
* first. That is blocks with only one copy, or blocks with zero live
* copies but a copy in a node being decommissioned. These blocks
* are at risk of loss if the disk or server on which they
* remain fails.</li>
* <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
* <li>{@link #QUEUE_VERY_LOW_REDUNDANCY}: blocks that are very
* under-replicated compared to their expected values. Currently
* that means the ratio of the ratio of actual:expected means that
* there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
* but they are clearly considered "important".
* <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
* <li>{@link #QUEUE_LOW_REDUNDANCY}: blocks that are also under
* replicated, and the ratio of actual:expected is good enough that
* they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
* they do not need to go into the {@link #QUEUE_VERY_LOW_REDUNDANCY}
* queue.</li>
* <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
* many copies of a block as required, but the blocks are not adequately
@ -63,15 +63,17 @@ import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
* blocks that are not corrupt higher priority.</li>
* </ol>
*/
class UnderReplicatedBlocks implements Iterable<BlockInfo> {
class LowRedundancyBlocks implements Iterable<BlockInfo> {
/** The total number of queues : {@value} */
static final int LEVEL = 5;
/** The queue with the highest priority: {@value} */
static final int QUEUE_HIGHEST_PRIORITY = 0;
/** The queue for blocks that are way below their expected value : {@value} */
static final int QUEUE_VERY_UNDER_REPLICATED = 1;
/** The queue for "normally" under-replicated blocks: {@value} */
static final int QUEUE_UNDER_REPLICATED = 2;
static final int QUEUE_VERY_LOW_REDUNDANCY = 1;
/**
* The queue for "normally" without sufficient redundancy blocks : {@value}.
*/
static final int QUEUE_LOW_REDUNDANCY = 2;
/** The queue for blocks that have the right number of replicas,
* but which the block manager felt were badly distributed: {@value}
*/
@ -86,7 +88,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
private int corruptReplOneBlocks = 0;
/** Create an object. */
UnderReplicatedBlocks() {
LowRedundancyBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
}
@ -102,7 +104,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
corruptReplOneBlocks = 0;
}
/** Return the total number of under replication blocks */
/** Return the total number of insufficient redundancy blocks. */
synchronized int size() {
int size = 0;
for (int i = 0; i < LEVEL; i++) {
@ -111,8 +113,11 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
return size;
}
/** Return the number of under replication blocks excluding corrupt blocks */
synchronized int getUnderReplicatedBlockCount() {
/**
* Return the number of insufficiently redundant blocks excluding corrupt
* blocks.
*/
synchronized int getLowRedundancyBlockCount() {
int size = 0;
for (int i = 0; i < LEVEL; i++) {
if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
@ -132,7 +137,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
return corruptReplOneBlocks;
}
/** Check if a block is in the neededReplication queue */
/** Check if a block is in the neededReconstruction queue. */
synchronized boolean contains(BlockInfo block) {
for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
if (set.contains(block)) {
@ -187,12 +192,12 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
// highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas * 3) < expectedReplicas) {
//there is less than a third as many blocks as requested;
//this is considered very under-replicated
return QUEUE_VERY_UNDER_REPLICATED;
//can only afford one replica loss
//this is considered very insufficiently redundant blocks.
return QUEUE_VERY_LOW_REDUNDANCY;
} else {
//add to the normal queue for under replicated blocks
return QUEUE_UNDER_REPLICATED;
//add to the normal queue for insufficiently redundant blocks
return QUEUE_LOW_REDUNDANCY;
}
}
@ -208,17 +213,19 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
// highest risk of loss, highest priority
return QUEUE_HIGHEST_PRIORITY;
} else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) {
// there is less than a third as many blocks as requested;
// this is considered very under-replicated
return QUEUE_VERY_UNDER_REPLICATED;
// can only afford one replica loss
// this is considered very insufficiently redundant blocks.
return QUEUE_VERY_LOW_REDUNDANCY;
} else {
// add to the normal queue for under replicated blocks
return QUEUE_UNDER_REPLICATED;
// add to the normal queue for insufficiently redundant blocks.
return QUEUE_LOW_REDUNDANCY;
}
}
/** add a block to a under replication queue according to its priority
* @param block a under replication block
/**
* Add a block to insufficiently redundant queue according to its priority.
*
* @param block a low redundancy block
* @param curReplicas current number of replicas of the block
* @param decomissionedReplicas the number of decommissioned replicas
* @param expectedReplicas expected number of replicas of the block
@ -238,17 +245,17 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
corruptReplOneBlocks++;
}
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add: {}"
+ " has only {} replicas and need {} replicas so is added to" +
" neededReplications at priority level {}", block, curReplicas,
expectedReplicas, priLevel);
"BLOCK* NameSystem.LowRedundancyBlock.add: {}"
+ " has only {} replicas and need {} replicas so is added to"
+ " neededReconstructions at priority level {}",
block, curReplicas, expectedReplicas, priLevel);
return true;
}
return false;
}
/** remove a block from a under replication queue */
/** Remove a block from a low redundancy queue. */
synchronized boolean remove(BlockInfo block,
int oldReplicas,
int oldReadOnlyReplicas,
@ -269,7 +276,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
/**
* Remove a block from the under replication queues.
* Remove a block from the low redundancy queues.
*
* The priLevel parameter is a hint of which queue to query
* first: if negative or &gt;= {@link #LEVEL} this shortcutting
@ -281,14 +288,16 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
* <i>Warning:</i> This is not a synchronized method.
* @param block block to remove
* @param priLevel expected privilege level
* @return true if the block was found and removed from one of the priority queues
* @return true if the block was found and removed from one of the priority
* queues
*/
boolean remove(BlockInfo block, int priLevel) {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
" from priority queue {}", block, priLevel);
"BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block {}"
+ " from priority queue {}",
block, priLevel);
return true;
} else {
// Try to remove the block from all queues if the block was
@ -296,7 +305,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
for (int i = 0; i < LEVEL; i++) {
if (i != priLevel && priorityQueues.get(i).remove(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" +
"BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block" +
" {} from priority queue {}", block, i);
return true;
}
@ -314,12 +323,13 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
* to add it to that queue. This ensures that the block will be
* in its expected priority queue (and only that queue) by the end of the
* method call.
* @param block a under replicated block
* @param block a low redundancy block
* @param curReplicas current number of replicas of the block
* @param decommissionedReplicas the number of decommissioned replicas
* @param curExpectedReplicas expected number of replicas of the block
* @param curReplicasDelta the change in the replicate count from before
* @param expectedReplicasDelta the change in the expected replica count from before
* @param expectedReplicasDelta the change in the expected replica count
* from before
*/
synchronized void update(BlockInfo block, int curReplicas,
int readOnlyReplicas, int decommissionedReplicas,
@ -332,7 +342,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
decommissionedReplicas, oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
block +
" curReplicas " + curReplicas +
" curExpectedReplicas " + curExpectedReplicas +
@ -346,10 +356,10 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
if(priorityQueues.get(curPri).add(block)) {
NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +
"replicas and needs {} replicas so is added to " +
"neededReplications at priority level {}", block, curReplicas,
curExpectedReplicas, curPri);
"BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} "
+ "replicas and needs {} replicas so is added to "
+ "neededReconstructions at priority level {}",
block, curReplicas, curExpectedReplicas, curPri);
}
if (oldPri != curPri || expectedReplicasDelta != 0) {
@ -365,25 +375,25 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
}
}
/**
* Get a list of block lists to be replicated. The index of block lists
* represents its replication priority. Iterates each block list in priority
* order beginning with the highest priority list. Iterators use a bookmark to
* resume where the previous iteration stopped. Returns when the block count
* is met or iteration reaches the end of the lowest priority list, in which
* case bookmarks for each block list are reset to the heads of their
* respective lists.
* Get a list of block lists without sufficient redundancy. The index of
* block lists represents its replication priority. Iterates each block list
* in priority order beginning with the highest priority list. Iterators use
* a bookmark to resume where the previous iteration stopped. Returns when
* the block count is met or iteration reaches the end of the lowest priority
* list, in which case bookmarks for each block list are reset to the heads
* of their respective lists.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated
* @param blocksToProcess - number of blocks to fetch from low redundancy
* blocks.
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
* @return Return a list of block lists to be replicated. The block list
* index represents its redundancy priority.
*/
synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks(
int blocksToProcess) {
final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL);
int count = 0;
int priority = 0;
for (; count < blocksToProcess && priority < LEVEL; priority++) {
@ -392,11 +402,11 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
continue;
}
// Go through all blocks that need replications with current priority.
// Set the iterator to the first unprocessed block at this priority level.
// Go through all blocks that need reconstructions with current priority.
// Set the iterator to the first unprocessed block at this priority level
final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
final List<BlockInfo> blocks = new LinkedList<>();
blocksToReplicate.add(blocks);
blocksToReconstruct.add(blocks);
// Loop through all remaining blocks in the list.
for(; count < blocksToProcess && i.hasNext(); count++) {
blocks.add(i.next());
@ -410,15 +420,15 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
}
return blocksToReplicate;
return blocksToReconstruct;
}
/** returns an iterator of all blocks in a given priority queue */
/** Returns an iterator of all blocks in a given priority queue. */
synchronized Iterator<BlockInfo> iterator(int level) {
return priorityQueues.get(level).iterator();
}
/** return an iterator of all the under replication blocks */
/** Return an iterator of all the low redundancy blocks. */
@Override
public synchronized Iterator<BlockInfo> iterator() {
final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator();
@ -445,4 +455,4 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
}
};
}
}
}

View File

@ -74,7 +74,7 @@ public class BlockManagerTestUtil {
final BlockInfo storedBlock = bm.getStoredBlock(b);
return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(storedBlock).liveReplicas(),
bm.neededReplications.contains(storedBlock) ? 1 : 0};
bm.neededReconstruction.contains(storedBlock) ? 1 : 0};
} finally {
namesystem.readUnlock();
}

View File

@ -397,20 +397,20 @@ public class TestBlockManager {
addNodes(nodes);
List<DatanodeDescriptor> origNodes = nodes.subList(0, 3);
for (int i = 0; i < NUM_TEST_ITERS; i++) {
doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes);
doTestSingleRackClusterHasSufficientRedundancy(i, origNodes);
}
}
private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
private void doTestSingleRackClusterHasSufficientRedundancy(int testIndex,
List<DatanodeDescriptor> origNodes)
throws Exception {
assertEquals(0, bm.numOfUnderReplicatedBlocks());
BlockInfo block = addBlockOnNodes(testIndex, origNodes);
assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
assertFalse(bm.isNeededReconstruction(block, bm.countLiveNodes(block)));
}
@Test(timeout = 60000)
public void testNeededReplicationWhileAppending() throws IOException {
public void testNeededReconstructionWhileAppending() throws IOException {
Configuration conf = new HdfsConfiguration();
String src = "/test-file";
Path file = new Path(src);
@ -449,7 +449,7 @@ public class TestBlockManager {
namenode.updatePipeline(clientName, oldBlock, newBlock,
newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs());
BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi)));
assertFalse(bm.isNeededReconstruction(bi, bm.countLiveNodes(bi)));
} finally {
IOUtils.closeStream(out);
}
@ -601,7 +601,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]);
assertEquals("Does not choose a source node for a less-than-highest-priority"
+ " replication since all available source nodes have reached"
@ -612,7 +612,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length);
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length);
// Increase the replication count to test replication count > hard limit
DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] };
@ -626,7 +626,7 @@ public class TestBlockManager {
liveNodes,
new NumberReplicas(),
new ArrayList<Byte>(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length);
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
}
@Test
@ -652,7 +652,7 @@ public class TestBlockManager {
cntNodes,
liveNodes,
new NumberReplicas(), new LinkedList<Byte>(),
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]);
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]);
// Increase the replication count to test replication count > hard limit
@ -666,7 +666,7 @@ public class TestBlockManager {
cntNodes,
liveNodes,
new NumberReplicas(), new LinkedList<Byte>(),
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length);
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length);
}
@Test

View File

@ -30,7 +30,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
public class TestUnderReplicatedBlockQueues {
public class TestLowRedundancyBlockQueues {
private final ErasureCodingPolicy ecPolicy =
ErasureCodingPolicyManager.getSystemDefaultPolicy();
@ -52,39 +52,39 @@ public class TestUnderReplicatedBlockQueues {
*/
@Test
public void testBlockPriorities() throws Throwable {
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
LowRedundancyBlocks queues = new LowRedundancyBlocks();
BlockInfo block1 = genBlockInfo(1);
BlockInfo block2 = genBlockInfo(2);
BlockInfo block_very_under_replicated = genBlockInfo(3);
BlockInfo block_very_low_redundancy = genBlockInfo(3);
BlockInfo block_corrupt = genBlockInfo(4);
BlockInfo block_corrupt_repl_one = genBlockInfo(5);
//add a block with a single entry
assertAdded(queues, block1, 1, 0, 3);
assertEquals(1, queues.getUnderReplicatedBlockCount());
assertEquals(1, queues.getLowRedundancyBlockCount());
assertEquals(1, queues.size());
assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
//repeated additions fail
assertFalse(queues.add(block1, 1, 0, 0, 3));
//add a second block with two replicas
assertAdded(queues, block2, 2, 0, 3);
assertEquals(2, queues.getUnderReplicatedBlockCount());
assertEquals(2, queues.getLowRedundancyBlockCount());
assertEquals(2, queues.size());
assertInLevel(queues, block2, UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
//now try to add a block that is corrupt
assertAdded(queues, block_corrupt, 0, 0, 3);
assertEquals(3, queues.size());
assertEquals(2, queues.getUnderReplicatedBlockCount());
assertEquals(2, queues.getLowRedundancyBlockCount());
assertEquals(1, queues.getCorruptBlockSize());
assertInLevel(queues, block_corrupt,
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
//insert a very under-replicated block
assertAdded(queues, block_very_under_replicated, 4, 0, 25);
assertInLevel(queues, block_very_under_replicated,
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
//insert a very insufficiently redundancy block
assertAdded(queues, block_very_low_redundancy, 4, 0, 25);
assertInLevel(queues, block_very_low_redundancy,
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
//insert a corrupt block with replication factor 1
assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
@ -94,7 +94,7 @@ public class TestUnderReplicatedBlockQueues {
assertEquals(0, queues.getCorruptReplOneBlockSize());
queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
assertEquals(1, queues.getCorruptReplOneBlockSize());
queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24);
queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24);
assertEquals(2, queues.getCorruptReplOneBlockSize());
}
@ -110,26 +110,26 @@ public class TestUnderReplicatedBlockQueues {
throws Throwable {
int groupSize = dataBlkNum + parityBlkNum;
long numBytes = ecPolicy.getCellSize() * dataBlkNum;
UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
LowRedundancyBlocks queues = new LowRedundancyBlocks();
int numUR = 0;
int numCorrupt = 0;
// add under replicated blocks
// add low redundancy blocks
for (int i = 0; dataBlkNum + i < groupSize; i++) {
BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes);
assertAdded(queues, block, dataBlkNum + i, 0, groupSize);
numUR++;
assertEquals(numUR, queues.getUnderReplicatedBlockCount());
assertEquals(numUR, queues.getLowRedundancyBlockCount());
assertEquals(numUR + numCorrupt, queues.size());
if (i == 0) {
assertInLevel(queues, block,
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
} else if (i * 3 < parityBlkNum + 1) {
assertInLevel(queues, block,
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED);
LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY);
} else {
assertInLevel(queues, block,
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED);
LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY);
}
}
@ -139,13 +139,13 @@ public class TestUnderReplicatedBlockQueues {
assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize);
numCorrupt++;
assertEquals(numUR + numCorrupt, queues.size());
assertEquals(numUR, queues.getUnderReplicatedBlockCount());
assertEquals(numUR, queues.getLowRedundancyBlockCount());
assertEquals(numCorrupt, queues.getCorruptBlockSize());
assertInLevel(queues, block_corrupt,
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
private void assertAdded(UnderReplicatedBlocks queues,
private void assertAdded(LowRedundancyBlocks queues,
BlockInfo block,
int curReplicas,
int decomissionedReplicas,
@ -167,7 +167,7 @@ public class TestUnderReplicatedBlockQueues {
* @param block block to look for
* @param level level to select
*/
private void assertInLevel(UnderReplicatedBlocks queues,
private void assertInLevel(LowRedundancyBlocks queues,
Block block,
int level) {
final Iterator<BlockInfo> bi = queues.iterator(level);

View File

@ -183,7 +183,7 @@ public class TestPendingReplication {
PendingReplicationBlocks pendingReplications =
blkManager.pendingReplications;
UnderReplicatedBlocks neededReplications = blkManager.neededReplications;
LowRedundancyBlocks neededReconstruction = blkManager.neededReconstruction;
BlocksMap blocksMap = blkManager.blocksMap;
//
@ -227,9 +227,9 @@ public class TestPendingReplication {
}
//
// Verify that block moves to neededReplications
// Verify that block moves to neededReconstruction
//
while (neededReplications.size() == 0) {
while (neededReconstruction.size() == 0) {
try {
Thread.sleep(100);
} catch (Exception e) {
@ -238,14 +238,14 @@ public class TestPendingReplication {
// Verify that the generation stamp we will try to replicate
// is now 1
for (Block b: neededReplications) {
for (Block b: neededReconstruction) {
assertEquals("Generation stamp is 1 ", 1,
b.getGenerationStamp());
}
// Verify size of neededReplications is exactly 1.
assertEquals("size of neededReplications is 1 ", 1,
neededReplications.size());
// Verify size of neededReconstruction is exactly 1.
assertEquals("size of neededReconstruction is 1 ", 1,
neededReconstruction.size());
} finally {
if (cluster != null) {
cluster.shutdown();

View File

@ -836,12 +836,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
.format(true).build();
try {
cluster.waitActive();
final UnderReplicatedBlocks neededReplications = cluster.getNameNode()
.getNamesystem().getBlockManager().neededReplications;
final LowRedundancyBlocks neededReconstruction = cluster.getNameNode()
.getNamesystem().getBlockManager().neededReconstruction;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 0, 3);
}
// Lets wait for the replication interval, to start process normal
@ -849,7 +849,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Adding the block directly to high priority list
neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
neededReconstruction.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 0, 3);
// Lets wait for the replication interval
@ -858,68 +858,68 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
// Check replication completed successfully. Need not wait till it process
// all the 100 normal blocks.
assertFalse("Not able to clear the element from high priority list",
neededReplications.iterator(HIGH_PRIORITY).hasNext());
neededReconstruction.iterator(HIGH_PRIORITY).hasNext());
} finally {
cluster.shutdown();
}
}
/**
* Test for the ChooseUnderReplicatedBlocks are processed based on priority
* Test for the ChooseLowRedundancyBlocks are processed based on priority
*/
@Test
public void testChooseUnderReplicatedBlocks() throws Exception {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
public void testChooseLowRedundancyBlocks() throws Exception {
LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();
for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 1, 0, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
// Adding QUEUE_VERY_LOW_REDUNDANCY block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 0, 7);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 6, 0, 0, 6);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 5, 0, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 0, 0, 3);
}
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
// from
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
// Choose 6 blocks from lowRedundancyBlocks. Then it should pick 5 blocks
// from QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_LOW_REDUNDANCY.
List<List<BlockInfo>> chosenBlocks =
underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
lowRedundancyBlocks.chooseLowRedundancyBlocks(6);
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
// QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
// Choose 10 blocks from lowRedundancyBlocks. Then it should pick 4 blocks
// from QUEUE_VERY_LOW_REDUNDANCY, 5 blocks from QUEUE_LOW_REDUNDANCY and 1
// block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
// Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
lowRedundancyBlocks.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 0, 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
// Choose 10 blocks from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_HIGHEST_PRIORITY, 4 blocks from
// QUEUE_REPLICAS_BADLY_DISTRIBUTED
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);
// Since it is reached to end of all lists,
// should start picking the blocks from start.
// Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from
// QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
// Choose 7 blocks from lowRedundancyBlocks. Then it should pick 6 blocks
// from QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(7);
assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
}
@ -1268,45 +1268,45 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
@Test(timeout = 60000)
public void testUpdateDoesNotCauseSkippedReplication() {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
LowRedundancyBlocks lowRedundancyBlocks = new LowRedundancyBlocks();
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_VERY_UNDER_REPLICATED block
// Adding QUEUE_VERY_LOW_REDUNDANCY block
final int block1CurReplicas = 2;
final int block1ExpectedReplicas = 7;
underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0,
lowRedundancyBlocks.add(block1, block1CurReplicas, 0, 0,
block1ExpectedReplicas);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_VERY_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 2, 0, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 0, 6);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block3, 2, 0, 0, 6);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Increasing the replications will move the block down a
// priority. This simulates a replica being completed in between checks.
underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0,
lowRedundancyBlocks.update(block1, block1CurReplicas+1, 0, 0,
block1ExpectedReplicas, 1, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block was moved up a priority and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 1, 0, 0, 0);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 0, 0, 1, 0, 0);
}
@ -1317,27 +1317,27 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
when(mockNS.hasWriteLock()).thenReturn(true);
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
// Adding this block collection to the BlockManager, so that when we add the
// Adding this block collection to the BlockManager, so that when we add
// block under construction, the BlockManager will realize the expected
// replication has been achieved and remove it from the under-replicated
// replication has been achieved and remove it from the low redundancy
// queue.
BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
info.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION, null);
@ -1353,9 +1353,9 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
ReplicaState.FINALIZED), storages[0]);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@ -1367,7 +1367,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
when(mockNS.hasWriteLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
long blkID1 = ThreadLocalRandom.current().nextLong();
if (blkID1 < 0) {
@ -1381,17 +1381,17 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
BlockInfo block1 = genBlockInfo(blkID1);
BlockInfo block2 = genBlockInfo(blkID2);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
final BlockInfoContiguous info = new BlockInfoContiguous(block1, (short) 1);
@ -1425,10 +1425,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
bm.convertLastBlockToUnderConstruction(mbc, 0L);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}
@ -1439,30 +1439,30 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest {
when(mockNS.hasReadLock()).thenReturn(true);
BlockManager bm = new BlockManager(mockNS, false, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
LowRedundancyBlocks lowRedundancyBlocks = bm.neededReconstruction;
BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block1, 0, 0, 1, 1);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 0, 1, 1);
// Adding QUEUE_LOW_REDUNDANCY block
lowRedundancyBlocks.add(block2, 0, 0, 1, 1);
List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
// Choose 1 block from lowRedundancyBlocks. Then it should pick 1 block
// from QUEUE_VERY_LOW_REDUNDANCY.
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
bm.setReplication((short)0, (short)1, block1);
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED.
// from QUEUE_VERY_LOW_REDUNDANCY.
// This block remains and should not be skipped over.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(1);
chosenBlocks = lowRedundancyBlocks.chooseLowRedundancyBlocks(1);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 0, 0);
}

View File

@ -152,7 +152,7 @@ public class TestMetaSave {
line = reader.readLine();
assertTrue(line.equals("Dead Datanodes: 1"));
line = reader.readLine();
assertTrue(line.equals("Metasave: Blocks waiting for replication: 0"));
assertTrue(line.equals("Metasave: Blocks waiting for reconstruction: 0"));
line = reader.readLine();
assertTrue(line.equals("Mis-replicated blocks that have been postponed:"));
line = reader.readLine();