HDFS-8608. Merge HDFS-7912 to trunk and branch-2 (track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks). Contributed by Zhe Zhang.

This commit is contained in:
Andrew Wang 2015-06-17 08:05:44 -07:00
parent ebb9a82519
commit 6e3fcffe29
20 changed files with 231 additions and 196 deletions

View File

@ -640,6 +640,10 @@ Release 2.8.0 - UNRELEASED
HDFS-7164. Feature documentation for HDFS-6581. (Arpit Agarwal) HDFS-7164. Feature documentation for HDFS-6581. (Arpit Agarwal)
HDFS-9608. Merge HDFS-7912 to trunk and branch-2 (track BlockInfo instead
of Block in UnderReplicatedBlocks and PendingReplicationBlocks).
(Zhe Zhang via wang)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -588,7 +588,7 @@ public class BlockManager {
/** /**
* @return true if the block has minimum replicas * @return true if the block has minimum replicas
*/ */
public boolean checkMinReplication(Block block) { public boolean checkMinReplication(BlockInfo block) {
return (countNodes(block).liveReplicas() >= minReplication); return (countNodes(block).liveReplicas() >= minReplication);
} }
@ -1310,7 +1310,7 @@ public class BlockManager {
* @return number of blocks scheduled for replication during this iteration. * @return number of blocks scheduled for replication during this iteration.
*/ */
int computeReplicationWork(int blocksToProcess) { int computeReplicationWork(int blocksToProcess) {
List<List<Block>> blocksToReplicate = null; List<List<BlockInfo>> blocksToReplicate = null;
namesystem.writeLock(); namesystem.writeLock();
try { try {
// Choose the blocks to be replicated // Choose the blocks to be replicated
@ -1328,7 +1328,7 @@ public class BlockManager {
* @return the number of blocks scheduled for replication * @return the number of blocks scheduled for replication
*/ */
@VisibleForTesting @VisibleForTesting
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) { int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas; int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes; List<DatanodeDescriptor> containingNodes;
DatanodeDescriptor srcNode; DatanodeDescriptor srcNode;
@ -1342,7 +1342,7 @@ public class BlockManager {
try { try {
synchronized (neededReplications) { synchronized (neededReplications) {
for (int priority = 0; priority < blocksToReplicate.size(); priority++) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
for (Block block : blocksToReplicate.get(priority)) { for (BlockInfo block : blocksToReplicate.get(priority)) {
// block should belong to a file // block should belong to a file
bc = blocksMap.getBlockCollection(block); bc = blocksMap.getBlockCollection(block);
// abandoned block or block reopened for append // abandoned block or block reopened for append
@ -1426,7 +1426,7 @@ public class BlockManager {
} }
synchronized (neededReplications) { synchronized (neededReplications) {
Block block = rw.block; BlockInfo block = rw.block;
int priority = rw.priority; int priority = rw.priority;
// Recheck since global lock was released // Recheck since global lock was released
// block should belong to a file // block should belong to a file
@ -1688,7 +1688,7 @@ public class BlockManager {
* and put them back into the neededReplication queue * and put them back into the neededReplication queue
*/ */
private void processPendingReplications() { private void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) { if (timedOutItems != null) {
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -2895,13 +2895,13 @@ public class BlockManager {
/** Set replication for the blocks. */ /** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl, public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) { final String src, final BlockInfo... blocks) {
if (newRepl == oldRepl) { if (newRepl == oldRepl) {
return; return;
} }
// update needReplication priority queues // update needReplication priority queues
for(Block b : blocks) { for(BlockInfo b : blocks) {
updateNeededReplications(b, 0, newRepl-oldRepl); updateNeededReplications(b, 0, newRepl-oldRepl);
} }
@ -2909,7 +2909,7 @@ public class BlockManager {
// old replication > the new one; need to remove copies // old replication > the new one; need to remove copies
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src); + " for " + src);
for(Block b : blocks) { for(BlockInfo b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null); processOverReplicatedBlock(b, newRepl, null, null);
} }
} else { // replication factor is increased } else { // replication factor is increased
@ -3092,7 +3092,8 @@ public class BlockManager {
blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node);
assert (namesystem.hasWriteLock()); assert (namesystem.hasWriteLock());
{ {
if (!blocksMap.removeNode(block, node)) { BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) {
blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + blockLog.debug("BLOCK* removeStoredBlock: {} has already been" +
" removed from node {}", block, node); " removed from node {}", block, node);
return; return;
@ -3106,8 +3107,8 @@ public class BlockManager {
// //
BlockCollection bc = blocksMap.getBlockCollection(block); BlockCollection bc = blocksMap.getBlockCollection(block);
if (bc != null) { if (bc != null) {
namesystem.decrementSafeBlockCount(block); namesystem.decrementSafeBlockCount(storedBlock);
updateNeededReplications(block, -1, 0); updateNeededReplications(storedBlock, -1, 0);
} }
// //
@ -3181,7 +3182,10 @@ public class BlockManager {
// //
// Modify the blocks->datanode map and node's map. // Modify the blocks->datanode map and node's map.
// //
pendingReplications.decrement(block, node); BlockInfo storedBlock = getStoredBlock(block);
if (storedBlock != null) {
pendingReplications.decrement(getStoredBlock(block), node);
}
processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED,
delHintNode); delHintNode);
} }
@ -3293,7 +3297,7 @@ public class BlockManager {
* Return the number of nodes hosting a given block, grouped * Return the number of nodes hosting a given block, grouped
* by the state of those replicas. * by the state of those replicas.
*/ */
public NumberReplicas countNodes(Block b) { public NumberReplicas countNodes(BlockInfo b) {
int decommissioned = 0; int decommissioned = 0;
int decommissioning = 0; int decommissioning = 0;
int live = 0; int live = 0;
@ -3326,12 +3330,12 @@ public class BlockManager {
} }
/** /**
* Simpler, faster form of {@link #countNodes(Block)} that only returns the number * Simpler, faster form of {@link #countNodes} that only returns the number
* of live nodes. If in startup safemode (or its 30-sec extension period), * of live nodes. If in startup safemode (or its 30-sec extension period),
* then it gains speed by ignoring issues of excess replicas or nodes * then it gains speed by ignoring issues of excess replicas or nodes
* that are decommissioned or in process of becoming decommissioned. * that are decommissioned or in process of becoming decommissioned.
* If not in startup, then it calls {@link #countNodes(Block)} instead. * If not in startup, then it calls {@link #countNodes} instead.
* *
* @param b - the block being tested * @param b - the block being tested
* @return count of live nodes for this block * @return count of live nodes for this block
*/ */
@ -3360,10 +3364,10 @@ public class BlockManager {
if (!namesystem.isPopulatingReplQueues()) { if (!namesystem.isPopulatingReplQueues()) {
return; return;
} }
final Iterator<? extends Block> it = srcNode.getBlockIterator(); final Iterator<BlockInfo> it = srcNode.getBlockIterator();
int numOverReplicated = 0; int numOverReplicated = 0;
while(it.hasNext()) { while(it.hasNext()) {
final Block block = it.next(); final BlockInfo block = it.next();
BlockCollection bc = blocksMap.getBlockCollection(block); BlockCollection bc = blocksMap.getBlockCollection(block);
short expectedReplication = bc.getPreferredBlockReplication(); short expectedReplication = bc.getPreferredBlockReplication();
NumberReplicas num = countNodes(block); NumberReplicas num = countNodes(block);
@ -3427,7 +3431,7 @@ public class BlockManager {
return blocksMap.size(); return blocksMap.size();
} }
public void removeBlock(Block block) { public void removeBlock(BlockInfo block) {
assert namesystem.hasWriteLock(); assert namesystem.hasWriteLock();
// No need to ACK blocks that are being removed entirely // No need to ACK blocks that are being removed entirely
// from the namespace, since the removal of the associated // from the namespace, since the removal of the associated
@ -3448,7 +3452,7 @@ public class BlockManager {
} }
/** updates a block in under replication queue */ /** updates a block in under replication queue */
private void updateNeededReplications(final Block block, private void updateNeededReplications(final BlockInfo block,
final int curReplicasDelta, int expectedReplicasDelta) { final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -3480,7 +3484,7 @@ public class BlockManager {
*/ */
public void checkReplication(BlockCollection bc) { public void checkReplication(BlockCollection bc) {
final short expected = bc.getPreferredBlockReplication(); final short expected = bc.getPreferredBlockReplication();
for (Block block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, expected, n.liveReplicas())) { if (isNeededReplication(block, expected, n.liveReplicas())) {
neededReplications.add(block, n.liveReplicas(), neededReplications.add(block, n.liveReplicas(),
@ -3682,7 +3686,7 @@ public class BlockManager {
/** /**
* Return an iterator over the set of blocks for which there are no replicas. * Return an iterator over the set of blocks for which there are no replicas.
*/ */
public Iterator<Block> getCorruptReplicaBlockIterator() { public Iterator<BlockInfo> getCorruptReplicaBlockIterator() {
return neededReplications.iterator( return neededReplications.iterator(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
} }
@ -3807,7 +3811,7 @@ public class BlockManager {
private static class ReplicationWork { private static class ReplicationWork {
private final Block block; private final BlockInfo block;
private final BlockCollection bc; private final BlockCollection bc;
private final DatanodeDescriptor srcNode; private final DatanodeDescriptor srcNode;
@ -3818,7 +3822,7 @@ public class BlockManager {
private DatanodeStorageInfo targets[]; private DatanodeStorageInfo targets[];
private final int priority; private final int priority;
public ReplicationWork(Block block, public ReplicationWork(BlockInfo block,
BlockCollection bc, BlockCollection bc,
DatanodeDescriptor srcNode, DatanodeDescriptor srcNode,
List<DatanodeDescriptor> containingNodes, List<DatanodeDescriptor> containingNodes,

View File

@ -23,6 +23,7 @@ import java.io.PrintWriter;
import java.sql.Time; import java.sql.Time;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -46,8 +47,8 @@ import org.slf4j.Logger;
class PendingReplicationBlocks { class PendingReplicationBlocks {
private static final Logger LOG = BlockManager.LOG; private static final Logger LOG = BlockManager.LOG;
private final Map<Block, PendingBlockInfo> pendingReplications; private final Map<BlockInfo, PendingBlockInfo> pendingReplications;
private final ArrayList<Block> timedOutItems; private final ArrayList<BlockInfo> timedOutItems;
Daemon timerThread = null; Daemon timerThread = null;
private volatile boolean fsRunning = true; private volatile boolean fsRunning = true;
@ -62,8 +63,8 @@ class PendingReplicationBlocks {
if ( timeoutPeriod > 0 ) { if ( timeoutPeriod > 0 ) {
this.timeout = timeoutPeriod; this.timeout = timeoutPeriod;
} }
pendingReplications = new HashMap<Block, PendingBlockInfo>(); pendingReplications = new HashMap<>();
timedOutItems = new ArrayList<Block>(); timedOutItems = new ArrayList<>();
} }
void start() { void start() {
@ -76,7 +77,7 @@ class PendingReplicationBlocks {
* @param block The corresponding block * @param block The corresponding block
* @param targets The DataNodes where replicas of the block should be placed * @param targets The DataNodes where replicas of the block should be placed
*/ */
void increment(Block block, DatanodeDescriptor[] targets) { void increment(BlockInfo block, DatanodeDescriptor[] targets) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found == null) { if (found == null) {
@ -93,9 +94,9 @@ class PendingReplicationBlocks {
* Decrement the number of pending replication requests * Decrement the number of pending replication requests
* for this block. * for this block.
* *
* @param The DataNode that finishes the replication * @param dn The DataNode that finishes the replication
*/ */
void decrement(Block block, DatanodeDescriptor dn) { void decrement(BlockInfo block, DatanodeDescriptor dn) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found != null) { if (found != null) {
@ -115,7 +116,7 @@ class PendingReplicationBlocks {
* @param block The given block whose pending replication requests need to be * @param block The given block whose pending replication requests need to be
* removed * removed
*/ */
void remove(Block block) { void remove(BlockInfo block) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
pendingReplications.remove(block); pendingReplications.remove(block);
} }
@ -138,7 +139,7 @@ class PendingReplicationBlocks {
/** /**
* How many copies of this block is pending replication? * How many copies of this block is pending replication?
*/ */
int getNumReplicas(Block block) { int getNumReplicas(BlockInfo block) {
synchronized (pendingReplications) { synchronized (pendingReplications) {
PendingBlockInfo found = pendingReplications.get(block); PendingBlockInfo found = pendingReplications.get(block);
if (found != null) { if (found != null) {
@ -153,13 +154,13 @@ class PendingReplicationBlocks {
* replication requests. Returns null if no blocks have * replication requests. Returns null if no blocks have
* timed out. * timed out.
*/ */
Block[] getTimedOutBlocks() { BlockInfo[] getTimedOutBlocks() {
synchronized (timedOutItems) { synchronized (timedOutItems) {
if (timedOutItems.size() <= 0) { if (timedOutItems.size() <= 0) {
return null; return null;
} }
Block[] blockList = timedOutItems.toArray( BlockInfo[] blockList = timedOutItems.toArray(
new Block[timedOutItems.size()]); new BlockInfo[timedOutItems.size()]);
timedOutItems.clear(); timedOutItems.clear();
return blockList; return blockList;
} }
@ -179,7 +180,7 @@ class PendingReplicationBlocks {
PendingBlockInfo(DatanodeDescriptor[] targets) { PendingBlockInfo(DatanodeDescriptor[] targets) {
this.timeStamp = monotonicNow(); this.timeStamp = monotonicNow();
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>() this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets)); : new ArrayList<>(Arrays.asList(targets));
} }
long getTimeStamp() { long getTimeStamp() {
@ -192,9 +193,7 @@ class PendingReplicationBlocks {
void incrementReplicas(DatanodeDescriptor... newTargets) { void incrementReplicas(DatanodeDescriptor... newTargets) {
if (newTargets != null) { if (newTargets != null) {
for (DatanodeDescriptor dn : newTargets) { Collections.addAll(targets, newTargets);
targets.add(dn);
}
} }
} }
@ -232,17 +231,17 @@ class PendingReplicationBlocks {
*/ */
void pendingReplicationCheck() { void pendingReplicationCheck() {
synchronized (pendingReplications) { synchronized (pendingReplications) {
Iterator<Map.Entry<Block, PendingBlockInfo>> iter = Iterator<Map.Entry<BlockInfo, PendingBlockInfo>> iter =
pendingReplications.entrySet().iterator(); pendingReplications.entrySet().iterator();
long now = monotonicNow(); long now = monotonicNow();
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor checking Q"); LOG.debug("PendingReplicationMonitor checking Q");
} }
while (iter.hasNext()) { while (iter.hasNext()) {
Map.Entry<Block, PendingBlockInfo> entry = iter.next(); Map.Entry<BlockInfo, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue(); PendingBlockInfo pendingBlock = entry.getValue();
if (now > pendingBlock.getTimeStamp() + timeout) { if (now > pendingBlock.getTimeStamp() + timeout) {
Block block = entry.getKey(); BlockInfo block = entry.getKey();
synchronized (timedOutItems) { synchronized (timedOutItems) {
timedOutItems.add(block); timedOutItems.add(block);
} }
@ -275,16 +274,14 @@ class PendingReplicationBlocks {
synchronized (pendingReplications) { synchronized (pendingReplications) {
out.println("Metasave: Blocks being replicated: " + out.println("Metasave: Blocks being replicated: " +
pendingReplications.size()); pendingReplications.size());
Iterator<Map.Entry<Block, PendingBlockInfo>> iter = for (Map.Entry<BlockInfo, PendingBlockInfo> entry :
pendingReplications.entrySet().iterator(); pendingReplications.entrySet()) {
while (iter.hasNext()) {
Map.Entry<Block, PendingBlockInfo> entry = iter.next();
PendingBlockInfo pendingBlock = entry.getValue(); PendingBlockInfo pendingBlock = entry.getValue();
Block block = entry.getKey(); Block block = entry.getKey();
out.println(block + out.println(block +
" StartTime: " + new Time(pendingBlock.timeStamp) + " StartTime: " + new Time(pendingBlock.timeStamp) +
" NumReplicaInProgress: " + " NumReplicaInProgress: " +
pendingBlock.getNumReplicas()); pendingBlock.getNumReplicas());
} }
} }
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -35,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
* *
* <p/> * <p/>
* The policy for choosing which priority to give added blocks * The policy for choosing which priority to give added blocks
* is implemented in {@link #getPriority(Block, int, int, int)}. * is implemented in {@link #getPriority(int, int, int)}.
* </p> * </p>
* <p>The queue order is as follows:</p> * <p>The queue order is as follows:</p>
* <ol> * <ol>
@ -62,7 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
* blocks that are not corrupt higher priority.</li> * blocks that are not corrupt higher priority.</li>
* </ol> * </ol>
*/ */
class UnderReplicatedBlocks implements Iterable<Block> { class UnderReplicatedBlocks implements Iterable<BlockInfo> {
/** The total number of queues : {@value} */ /** The total number of queues : {@value} */
static final int LEVEL = 5; static final int LEVEL = 5;
/** The queue with the highest priority: {@value} */ /** The queue with the highest priority: {@value} */
@ -78,8 +77,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** The queue for corrupt blocks: {@value} */ /** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
/** the queues themselves */ /** the queues themselves */
private final List<LightWeightLinkedSet<Block>> priorityQueues private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>(LEVEL); = new ArrayList<>(LEVEL);
/** The number of corrupt blocks with replication factor 1 */ /** The number of corrupt blocks with replication factor 1 */
private int corruptReplOneBlocks = 0; private int corruptReplOneBlocks = 0;
@ -87,7 +86,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** Create an object. */ /** Create an object. */
UnderReplicatedBlocks() { UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) { for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>()); priorityQueues.add(new LightWeightLinkedSet<BlockInfo>());
} }
} }
@ -131,8 +130,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
} }
/** Check if a block is in the neededReplication queue */ /** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) { synchronized boolean contains(BlockInfo block) {
for(LightWeightLinkedSet<Block> set : priorityQueues) { for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) {
if (set.contains(block)) { if (set.contains(block)) {
return true; return true;
} }
@ -141,13 +140,11 @@ class UnderReplicatedBlocks implements Iterable<Block> {
} }
/** Return the priority of a block /** Return the priority of a block
* @param block a under replicated block
* @param curReplicas current number of replicas of the block * @param curReplicas current number of replicas of the block
* @param expectedReplicas expected number of replicas of the block * @param expectedReplicas expected number of replicas of the block
* @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/ */
private int getPriority(Block block, private int getPriority(int curReplicas,
int curReplicas,
int decommissionedReplicas, int decommissionedReplicas,
int expectedReplicas) { int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!"; assert curReplicas >= 0 : "Negative replicas!";
@ -183,12 +180,12 @@ class UnderReplicatedBlocks implements Iterable<Block> {
* @param expectedReplicas expected number of replicas of the block * @param expectedReplicas expected number of replicas of the block
* @return true if the block was added to a queue. * @return true if the block was added to a queue.
*/ */
synchronized boolean add(Block block, synchronized boolean add(BlockInfo block,
int curReplicas, int curReplicas,
int decomissionedReplicas, int decomissionedReplicas,
int expectedReplicas) { int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!"; assert curReplicas >= 0 : "Negative replicas!";
int priLevel = getPriority(block, curReplicas, decomissionedReplicas, int priLevel = getPriority(curReplicas, decomissionedReplicas,
expectedReplicas); expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) { if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
@ -207,11 +204,11 @@ class UnderReplicatedBlocks implements Iterable<Block> {
} }
/** remove a block from a under replication queue */ /** remove a block from a under replication queue */
synchronized boolean remove(Block block, synchronized boolean remove(BlockInfo block,
int oldReplicas, int oldReplicas,
int decommissionedReplicas, int decommissionedReplicas,
int oldExpectedReplicas) { int oldExpectedReplicas) {
int priLevel = getPriority(block, oldReplicas, int priLevel = getPriority(oldReplicas,
decommissionedReplicas, decommissionedReplicas,
oldExpectedReplicas); oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel); boolean removedBlock = remove(block, priLevel);
@ -241,8 +238,8 @@ class UnderReplicatedBlocks implements Iterable<Block> {
* @param priLevel expected privilege level * @param priLevel expected privilege level
* @return true if the block was found and removed from one of the priority queues * @return true if the block was found and removed from one of the priority queues
*/ */
boolean remove(Block block, int priLevel) { boolean remove(BlockInfo block, int priLevel) {
if(priLevel >= 0 && priLevel < LEVEL if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) { && priorityQueues.get(priLevel).remove(block)) {
NameNode.blockStateChangeLog.debug( NameNode.blockStateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" + "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" +
@ -279,14 +276,16 @@ class UnderReplicatedBlocks implements Iterable<Block> {
* @param curReplicasDelta the change in the replicate count from before * @param curReplicasDelta the change in the replicate count from before
* @param expectedReplicasDelta the change in the expected replica count from before * @param expectedReplicasDelta the change in the expected replica count from before
*/ */
synchronized void update(Block block, int curReplicas, synchronized void update(BlockInfo block, int curReplicas,
int decommissionedReplicas, int decommissionedReplicas,
int curExpectedReplicas, int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) { int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta; int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); int curPri = getPriority(curReplicas, decommissionedReplicas,
int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); curExpectedReplicas);
int oldPri = getPriority(oldReplicas, decommissionedReplicas,
oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) { if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " +
block + block +
@ -336,12 +335,12 @@ class UnderReplicatedBlocks implements Iterable<Block> {
* @return Return a list of block lists to be replicated. The block list index * @return Return a list of block lists to be replicated. The block list index
* represents its replication priority. * represents its replication priority.
*/ */
public synchronized List<List<Block>> chooseUnderReplicatedBlocks( public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
int blocksToProcess) { int blocksToProcess) {
// initialize data structure for the return value // initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL); List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
for (int i = 0; i < LEVEL; i++) { for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>()); blocksToReplicate.add(new ArrayList<BlockInfo>());
} }
if (size() == 0) { // There are no blocks to collect. if (size() == 0) { // There are no blocks to collect.
@ -364,7 +363,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
// Loop through all remaining blocks in the list. // Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) { && neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next(); BlockInfo block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block); blocksToReplicate.get(priority).add(block);
blockCount++; blockCount++;
} }
@ -396,10 +395,10 @@ class UnderReplicatedBlocks implements Iterable<Block> {
/** /**
* An iterator over blocks. * An iterator over blocks.
*/ */
class BlockIterator implements Iterator<Block> { class BlockIterator implements Iterator<BlockInfo> {
private int level; private int level;
private boolean isIteratorForLevel = false; private boolean isIteratorForLevel = false;
private final List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>(); private final List<Iterator<BlockInfo>> iterators = new ArrayList<>();
/** /**
* Construct an iterator over all queues. * Construct an iterator over all queues.
@ -431,7 +430,7 @@ class UnderReplicatedBlocks implements Iterable<Block> {
} }
@Override @Override
public Block next() { public BlockInfo next() {
if (isIteratorForLevel) { if (isIteratorForLevel) {
return iterators.get(0).next(); return iterators.get(0).next();
} }

View File

@ -25,12 +25,12 @@ import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.util.EnumCounters; import org.apache.hadoop.hdfs.util.EnumCounters;
@ -148,8 +148,8 @@ public class FSDirAttrOp {
} }
final short[] blockRepls = new short[2]; // 0: old, 1: new final short[] blockRepls = new short[2]; // 0: old, 1: new
final Block[] blocks = unprotectedSetReplication(fsd, src, replication, final BlockInfo[] blocks = unprotectedSetReplication(fsd, src,
blockRepls); replication, blockRepls);
isFile = blocks != null; isFile = blocks != null;
if (isFile) { if (isFile) {
fsd.getEditLog().logSetReplication(src, replication); fsd.getEditLog().logSetReplication(src, replication);
@ -375,7 +375,7 @@ public class FSDirAttrOp {
} }
} }
static Block[] unprotectedSetReplication( static BlockInfo[] unprotectedSetReplication(
FSDirectory fsd, String src, short replication, short[] blockRepls) FSDirectory fsd, String src, short replication, short[] blockRepls)
throws QuotaExceededException, UnresolvedLinkException, throws QuotaExceededException, UnresolvedLinkException,
SnapshotAccessControlException { SnapshotAccessControlException {

View File

@ -3170,8 +3170,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* of blocks that need to be removed from blocksMap * of blocks that need to be removed from blocksMap
*/ */
void removeBlocks(BlocksMapUpdateInfo blocks) { void removeBlocks(BlocksMapUpdateInfo blocks) {
List<Block> toDeleteList = blocks.getToDeleteList(); List<BlockInfo> toDeleteList = blocks.getToDeleteList();
Iterator<Block> iter = toDeleteList.iterator(); Iterator<BlockInfo> iter = toDeleteList.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
writeLock(); writeLock();
try { try {
@ -3227,12 +3227,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean trackBlockCounts = isSafeModeTrackingBlocks(); boolean trackBlockCounts = isSafeModeTrackingBlocks();
int numRemovedComplete = 0, numRemovedSafe = 0; int numRemovedComplete = 0, numRemovedSafe = 0;
for (Block b : blocks.getToDeleteList()) { for (BlockInfo b : blocks.getToDeleteList()) {
if (trackBlockCounts) { if (trackBlockCounts) {
BlockInfo bi = getStoredBlock(b); if (b.isComplete()) {
if (bi.isComplete()) {
numRemovedComplete++; numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) { if (blockManager.checkMinReplication(b)) {
numRemovedSafe++; numRemovedSafe++;
} }
} }
@ -4151,7 +4150,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
boolean changed = false; boolean changed = false;
writeLock(); writeLock();
try { try {
final Iterator<Block> it = blockManager.getCorruptReplicaBlockIterator(); final Iterator<BlockInfo> it =
blockManager.getCorruptReplicaBlockIterator();
while (it.hasNext()) { while (it.hasNext()) {
Block b = it.next(); Block b = it.next();
@ -5093,7 +5093,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
@Override @Override
public void decrementSafeBlockCount(Block b) { public void decrementSafeBlockCount(BlockInfo b) {
// safeMode is volatile, and may be set to null at any time // safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode; SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true if (safeMode == null) // mostly true
@ -5918,7 +5918,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
// print a limited # of corrupt files per call // print a limited # of corrupt files per call
final Iterator<Block> blkIterator = blockManager.getCorruptReplicaBlockIterator(); final Iterator<BlockInfo> blkIterator =
blockManager.getCorruptReplicaBlockIterator();
int skip = getIntCookie(cookieTab[0]); int skip = getIntCookie(cookieTab[0]);
for (int i = 0; i < skip && blkIterator.hasNext(); i++) { for (int i = 0; i < skip && blkIterator.hasNext(); i++) {
@ -5926,7 +5927,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
while (blkIterator.hasNext()) { while (blkIterator.hasNext()) {
Block blk = blkIterator.next(); BlockInfo blk = blkIterator.next();
final INode inode = (INode)blockManager.getBlockCollection(blk); final INode inode = (INode)blockManager.getBlockCollection(blk);
skip++; skip++;
if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) { if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) {

View File

@ -34,9 +34,9 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference; import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
@ -950,8 +950,8 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
/** /**
* The list of blocks that need to be removed from blocksMap * The list of blocks that need to be removed from blocksMap
*/ */
private final List<Block> toDeleteList; private final List<BlockInfo> toDeleteList;
public BlocksMapUpdateInfo() { public BlocksMapUpdateInfo() {
toDeleteList = new ChunkedArrayList<>(); toDeleteList = new ChunkedArrayList<>();
} }
@ -959,7 +959,7 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
/** /**
* @return The list of blocks that need to be removed from blocksMap * @return The list of blocks that need to be removed from blocksMap
*/ */
public List<Block> getToDeleteList() { public List<BlockInfo> getToDeleteList() {
return toDeleteList; return toDeleteList;
} }
@ -968,12 +968,12 @@ public abstract class INode implements INodeAttributes, Diff.Element<byte[]> {
* {@link BlocksMapUpdateInfo#toDeleteList} * {@link BlocksMapUpdateInfo#toDeleteList}
* @param toDelete the to-be-deleted block * @param toDelete the to-be-deleted block
*/ */
public void addDeleteBlock(Block toDelete) { public void addDeleteBlock(BlockInfo toDelete) {
assert toDelete != null : "toDelete is null"; assert toDelete != null : "toDelete is null";
toDeleteList.add(toDelete); toDeleteList.add(toDelete);
} }
public void removeDeleteBlock(Block block) { public void removeDeleteBlock(BlockInfo block) {
assert block != null : "block is null"; assert block != null : "block is null";
toDeleteList.remove(block); toDeleteList.remove(block);
} }

View File

@ -904,8 +904,8 @@ public class INodeFile extends INodeWithAdditionalFields
getDiffs().findEarlierSnapshotBlocks(snapshotId); getDiffs().findEarlierSnapshotBlocks(snapshotId);
if(snapshotBlocks == null) if(snapshotBlocks == null)
return; return;
List<Block> toDelete = collectedBlocks.getToDeleteList(); List<BlockInfo> toDelete = collectedBlocks.getToDeleteList();
for(Block blk : snapshotBlocks) { for(BlockInfo blk : snapshotBlocks) {
if(toDelete.contains(blk)) if(toDelete.contains(blk))
collectedBlocks.removeDeleteBlock(blk); collectedBlocks.removeDeleteBlock(blk);
} }

View File

@ -109,7 +109,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
public class NamenodeFsck implements DataEncryptionKeyFactory { public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
// return string marking fsck status // return string marking fsck status
public static final String CORRUPT_STATUS = "is CORRUPT"; public static final String CORRUPT_STATUS = "is CORRUPT";
public static final String HEALTHY_STATUS = "is HEALTHY"; public static final String HEALTHY_STATUS = "is HEALTHY";
@ -117,7 +117,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED"; public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED";
public static final String NONEXISTENT_STATUS = "does not exist"; public static final String NONEXISTENT_STATUS = "does not exist";
public static final String FAILURE_STATUS = "FAILED"; public static final String FAILURE_STATUS = "FAILED";
private final NameNode namenode; private final NameNode namenode;
private final NetworkTopology networktopology; private final NetworkTopology networktopology;
private final int totalDatanodes; private final int totalDatanodes;
@ -143,14 +143,14 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
*/ */
private boolean internalError = false; private boolean internalError = false;
/** /**
* True if the user specified the -move option. * True if the user specified the -move option.
* *
* Whe this option is in effect, we will copy salvaged blocks into the lost * Whe this option is in effect, we will copy salvaged blocks into the lost
* and found. */ * and found. */
private boolean doMove = false; private boolean doMove = false;
/** /**
* True if the user specified the -delete option. * True if the user specified the -delete option.
* *
* Whe this option is in effect, we will delete corrupted files. * Whe this option is in effect, we will delete corrupted files.
@ -183,7 +183,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
* @param remoteAddress source address of the fsck request * @param remoteAddress source address of the fsck request
*/ */
NamenodeFsck(Configuration conf, NameNode namenode, NamenodeFsck(Configuration conf, NameNode namenode,
NetworkTopology networktopology, NetworkTopology networktopology,
Map<String,String[]> pmap, PrintWriter out, Map<String,String[]> pmap, PrintWriter out,
int totalDatanodes, InetAddress remoteAddress) { int totalDatanodes, InetAddress remoteAddress) {
this.conf = conf; this.conf = conf;
@ -199,7 +199,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
this.staleInterval = this.staleInterval =
conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) { for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
String key = it.next(); String key = it.next();
if (key.equals("path")) { this.path = pmap.get("path")[0]; } if (key.equals("path")) { this.path = pmap.get("path")[0]; }
@ -251,7 +251,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
} }
BlockCollection bc = bm.getBlockCollection(blockInfo); BlockCollection bc = bm.getBlockCollection(blockInfo);
INode iNode = (INode) bc; INode iNode = (INode) bc;
NumberReplicas numberReplicas= bm.countNodes(block); NumberReplicas numberReplicas= bm.countNodes(blockInfo);
out.println("Block Id: " + blockId); out.println("Block Id: " + blockId);
out.println("Block belongs to: "+iNode.getFullPathName()); out.println("Block belongs to: "+iNode.getFullPathName());
out.println("No. of Expected Replica: " + out.println("No. of Expected Replica: " +
@ -350,7 +350,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
listCorruptFileBlocks(); listCorruptFileBlocks();
return; return;
} }
if (this.showStoragePolcies) { if (this.showStoragePolcies) {
storageTypeSummary = new StoragePolicySummary( storageTypeSummary = new StoragePolicySummary(
namenode.getNamesystem().getBlockManager().getStoragePolicies()); namenode.getNamesystem().getBlockManager().getStoragePolicies());
@ -380,7 +380,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
// DFSck client scans for the string HEALTHY/CORRUPT to check the status // DFSck client scans for the string HEALTHY/CORRUPT to check the status
// of file system and return appropriate code. Changing the output // of file system and return appropriate code. Changing the output
// string might break testcases. Also note this must be the last line // string might break testcases. Also note this must be the last line
// of the report. // of the report.
if (res.isHealthy()) { if (res.isHealthy()) {
out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS); out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS);
@ -423,7 +423,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
+ " CORRUPT files"); + " CORRUPT files");
out.println(); out.println();
} }
@VisibleForTesting @VisibleForTesting
void check(String parent, HdfsFileStatus file, Result res) throws IOException { void check(String parent, HdfsFileStatus file, Result res) throws IOException {
String path = file.getFullName(parent); String path = file.getFullName(parent);
@ -480,7 +480,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
throws IOException { throws IOException {
long fileLen = file.getLen(); long fileLen = file.getLen();
LocatedBlocks blocks = null; LocatedBlocks blocks = null;
FSNamesystem fsn = namenode.getNamesystem(); final FSNamesystem fsn = namenode.getNamesystem();
fsn.readLock(); fsn.readLock();
try { try {
blocks = fsn.getBlockLocations( blocks = fsn.getBlockLocations(
@ -539,8 +539,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
ExtendedBlock block = lBlk.getBlock(); ExtendedBlock block = lBlk.getBlock();
BlockManager bm = namenode.getNamesystem().getBlockManager(); BlockManager bm = namenode.getNamesystem().getBlockManager();
final BlockInfo storedBlock = bm.getStoredBlock(
block.getLocalBlock());
// count decommissionedReplicas / decommissioningReplicas // count decommissionedReplicas / decommissioningReplicas
NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock()); NumberReplicas numberReplicas = bm.countNodes(storedBlock);
int decommissionedReplicas = numberReplicas.decommissioned();; int decommissionedReplicas = numberReplicas.decommissioned();;
int decommissioningReplicas = numberReplicas.decommissioning(); int decommissioningReplicas = numberReplicas.decommissioning();
res.decommissionedReplicas += decommissionedReplicas; res.decommissionedReplicas += decommissionedReplicas;
@ -608,7 +610,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
out.println(); out.println();
out.print(path + ": "); out.print(path + ": ");
} }
out.println(" Replica placement policy is violated for " + out.println(" Replica placement policy is violated for " +
block + ". " + blockPlacementStatus.getErrorDescription()); block + ". " + blockPlacementStatus.getErrorDescription());
} }
@ -743,7 +745,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
return false; return false;
} }
} }
private void copyBlocksToLostFound(String parent, HdfsFileStatus file, private void copyBlocksToLostFound(String parent, HdfsFileStatus file,
LocatedBlocks blocks) throws IOException { LocatedBlocks blocks) throws IOException {
final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf); final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf);
@ -784,7 +786,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
fos = dfs.create(target + "/" + chain, true); fos = dfs.create(target + "/" + chain, true);
chain++; chain++;
} }
// copy the block. It's a pity it's not abstracted from DFSInputStream ... // copy the block. It's a pity it's not abstracted from DFSInputStream ...
try { try {
copyBlock(dfs, lblock, fos); copyBlock(dfs, lblock, fos);
@ -802,7 +804,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
LOG.warn("Fsck: there were errors copying the remains of the " + LOG.warn("Fsck: there were errors copying the remains of the " +
"corrupted file " + fullName + " to /lost+found"); "corrupted file " + fullName + " to /lost+found");
} else { } else {
LOG.info("Fsck: copied the remains of the corrupted file " + LOG.info("Fsck: copied the remains of the corrupted file " +
fullName + " to /lost+found"); fullName + " to /lost+found");
} }
} catch (Exception e) { } catch (Exception e) {
@ -813,7 +815,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
dfs.close(); dfs.close();
} }
} }
/* /*
* XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is
* bad. Both places should be refactored to provide a method to copy blocks * bad. Both places should be refactored to provide a method to copy blocks
@ -824,12 +826,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
int failures = 0; int failures = 0;
InetSocketAddress targetAddr = null; InetSocketAddress targetAddr = null;
TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>(); TreeSet<DatanodeInfo> deadNodes = new TreeSet<DatanodeInfo>();
BlockReader blockReader = null; BlockReader blockReader = null;
ExtendedBlock block = lblock.getBlock(); ExtendedBlock block = lblock.getBlock();
while (blockReader == null) { while (blockReader == null) {
DatanodeInfo chosenNode; DatanodeInfo chosenNode;
try { try {
chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes); chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes);
targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr()); targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr());
@ -900,7 +902,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
bytesRead += cnt; bytesRead += cnt;
} }
if ( bytesRead != block.getNumBytes() ) { if ( bytesRead != block.getNumBytes() ) {
throw new IOException("Recorded block size is " + block.getNumBytes() + throw new IOException("Recorded block size is " + block.getNumBytes() +
", but datanode returned " +bytesRead+" bytes"); ", but datanode returned " +bytesRead+" bytes");
} }
} catch (Exception e) { } catch (Exception e) {
@ -937,12 +939,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
} while (deadNodes.contains(chosenNode)); } while (deadNodes.contains(chosenNode));
return chosenNode; return chosenNode;
} }
private void lostFoundInit(DFSClient dfs) { private void lostFoundInit(DFSClient dfs) {
lfInited = true; lfInited = true;
try { try {
String lfName = "/lost+found"; String lfName = "/lost+found";
final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName); final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName);
if (lfStatus == null) { // not exists if (lfStatus == null) { // not exists
lfInitedOk = dfs.mkdirs(lfName, null, true); lfInitedOk = dfs.mkdirs(lfName, null, true);
@ -997,21 +999,21 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
final short replication; final short replication;
final int minReplication; final int minReplication;
Result(Configuration conf) { Result(Configuration conf) {
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT); DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
} }
/** /**
* DFS is considered healthy if there are no missing blocks. * DFS is considered healthy if there are no missing blocks.
*/ */
boolean isHealthy() { boolean isHealthy() {
return ((missingIds.size() == 0) && (corruptBlocks == 0)); return ((missingIds.size() == 0) && (corruptBlocks == 0));
} }
/** Add a missing block name, plus its size. */ /** Add a missing block name, plus its size. */
void addMissing(String id, long size) { void addMissing(String id, long size) {
missingIds.add(id); missingIds.add(id);
@ -1030,7 +1032,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
return 0.0f; return 0.0f;
return (float) (totalReplicas) / (float) totalBlocks; return (float) (totalReplicas) / (float) totalBlocks;
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder res = new StringBuilder(); StringBuilder res = new StringBuilder();

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
/** SafeMode related operations. */ /** SafeMode related operations. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -49,5 +49,5 @@ public interface SafeMode {
public void incrementSafeBlockCount(int replication); public void incrementSafeBlockCount(int replication);
/** Decrement number of blocks that reached minimal replication. */ /** Decrement number of blocks that reached minimal replication. */
public void decrementSafeBlockCount(Block b); public void decrementSafeBlockCount(BlockInfo b);
} }

View File

@ -69,9 +69,10 @@ public class BlockManagerTestUtil {
final BlockManager bm = namesystem.getBlockManager(); final BlockManager bm = namesystem.getBlockManager();
namesystem.readLock(); namesystem.readLock();
try { try {
final BlockInfo storedBlock = bm.getStoredBlock(b);
return new int[]{getNumberOfRacks(bm, b), return new int[]{getNumberOfRacks(bm, b),
bm.countNodes(b).liveReplicas(), bm.countNodes(storedBlock).liveReplicas(),
bm.neededReplications.contains(b) ? 1 : 0}; bm.neededReplications.contains(storedBlock) ? 1 : 0};
} finally { } finally {
namesystem.readUnlock(); namesystem.readUnlock();
} }

View File

@ -440,14 +440,14 @@ public class TestBlockManager {
return blockInfo; return blockInfo;
} }
private DatanodeStorageInfo[] scheduleSingleReplication(Block block) { private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) {
// list for priority 1 // list for priority 1
List<Block> list_p1 = new ArrayList<Block>(); List<BlockInfo> list_p1 = new ArrayList<>();
list_p1.add(block); list_p1.add(block);
// list of lists for each priority // list of lists for each priority
List<List<Block>> list_all = new ArrayList<List<Block>>(); List<List<BlockInfo>> list_all = new ArrayList<>();
list_all.add(new ArrayList<Block>()); // for priority 0 list_all.add(new ArrayList<BlockInfo>()); // for priority 0
list_all.add(list_p1); // for priority 1 list_all.add(list_p1); // for priority 1
assertEquals("Block not initially pending replication", 0, assertEquals("Block not initially pending replication", 0,

View File

@ -166,10 +166,11 @@ public class TestNodeCount {
/* threadsafe read of the replication counts for this block */ /* threadsafe read of the replication counts for this block */
NumberReplicas countNodes(Block block, FSNamesystem namesystem) { NumberReplicas countNodes(Block block, FSNamesystem namesystem) {
BlockManager blockManager = namesystem.getBlockManager();
namesystem.readLock(); namesystem.readLock();
try { try {
lastBlock = block; lastBlock = block;
lastNum = namesystem.getBlockManager().countNodes(block); lastNum = blockManager.countNodes(blockManager.getStoredBlock(block));
return lastNum; return lastNum;
} }
finally { finally {

View File

@ -117,7 +117,8 @@ public class TestOverReplicatedBlocks {
// corrupt one won't be chosen to be excess one // corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost // without 4910 the number of live replicas would be 0: block gets lost
assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas()); assertEquals(1, bm.countNodes(
bm.getStoredBlock(block.getLocalBlock())).liveReplicas());
} }
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();
@ -219,7 +220,7 @@ public class TestOverReplicatedBlocks {
out.close(); out.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p);
assertEquals("Expected only one live replica for the block", 1, bm assertEquals("Expected only one live replica for the block", 1, bm
.countNodes(block.getLocalBlock()).liveReplicas()); .countNodes(bm.getStoredBlock(block.getLocalBlock())).liveReplicas());
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -53,6 +53,12 @@ public class TestPendingReplication {
private static final int DFS_REPLICATION_INTERVAL = 1; private static final int DFS_REPLICATION_INTERVAL = 1;
// Number of datanodes in the cluster // Number of datanodes in the cluster
private static final int DATANODE_COUNT = 5; private static final int DATANODE_COUNT = 5;
private BlockInfo genBlockInfo(long id, long length, long gs) {
return new BlockInfoContiguous(new Block(id, length, gs),
(short) DATANODE_COUNT);
}
@Test @Test
public void testPendingReplication() { public void testPendingReplication() {
PendingReplicationBlocks pendingReplications; PendingReplicationBlocks pendingReplications;
@ -63,7 +69,7 @@ public class TestPendingReplication {
// //
DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10); DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10);
for (int i = 0; i < storages.length; i++) { for (int i = 0; i < storages.length; i++) {
Block block = new Block(i, i, 0); BlockInfo block = genBlockInfo(i, i, 0);
DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i];
System.arraycopy(storages, 0, targets, 0, i); System.arraycopy(storages, 0, targets, 0, i);
pendingReplications.increment(block, pendingReplications.increment(block,
@ -76,7 +82,7 @@ public class TestPendingReplication {
// //
// remove one item and reinsert it // remove one item and reinsert it
// //
Block blk = new Block(8, 8, 0); BlockInfo blk = genBlockInfo(8, 8, 0);
pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica
assertEquals("pendingReplications.getNumReplicas ", assertEquals("pendingReplications.getNumReplicas ",
7, pendingReplications.getNumReplicas(blk)); 7, pendingReplications.getNumReplicas(blk));
@ -96,7 +102,7 @@ public class TestPendingReplication {
// are sane. // are sane.
// //
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Block block = new Block(i, i, 0); BlockInfo block = genBlockInfo(i, i, 0);
int numReplicas = pendingReplications.getNumReplicas(block); int numReplicas = pendingReplications.getNumReplicas(block);
assertTrue(numReplicas == i); assertTrue(numReplicas == i);
} }
@ -115,7 +121,7 @@ public class TestPendingReplication {
} }
for (int i = 10; i < 15; i++) { for (int i = 10; i < 15; i++) {
Block block = new Block(i, i, 0); BlockInfo block = genBlockInfo(i, i, 0);
pendingReplications.increment(block, pendingReplications.increment(block,
DatanodeStorageInfo.toDatanodeDescriptors( DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(i))); DFSTestUtil.createDatanodeStorageInfos(i)));
@ -180,7 +186,7 @@ public class TestPendingReplication {
block = new Block(1, 1, 0); block = new Block(1, 1, 0);
blockInfo = new BlockInfoContiguous(block, (short) 3); blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReplications.increment(block, pendingReplications.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors( DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1))); DFSTestUtil.createDatanodeStorageInfos(1)));
BlockCollection bc = Mockito.mock(BlockCollection.class); BlockCollection bc = Mockito.mock(BlockCollection.class);
@ -195,7 +201,8 @@ public class TestPendingReplication {
// Add a second block to pendingReplications that has no // Add a second block to pendingReplications that has no
// corresponding entry in blocksmap // corresponding entry in blocksmap
block = new Block(2, 2, 0); block = new Block(2, 2, 0);
pendingReplications.increment(block, blockInfo = new BlockInfoContiguous(block, (short) 3);
pendingReplications.increment(blockInfo,
DatanodeStorageInfo.toDatanodeDescriptors( DatanodeStorageInfo.toDatanodeDescriptors(
DFSTestUtil.createDatanodeStorageInfos(1))); DFSTestUtil.createDatanodeStorageInfos(1)));
@ -275,7 +282,7 @@ public class TestPendingReplication {
assertEquals(1, blkManager.pendingReplications.size()); assertEquals(1, blkManager.pendingReplications.size());
INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile(); INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile();
Block[] blocks = fileNode.getBlocks(); BlockInfo[] blocks = fileNode.getBlocks();
assertEquals(DATANODE_COUNT - 1, assertEquals(DATANODE_COUNT - 1,
blkManager.pendingReplications.getNumReplicas(blocks[0])); blkManager.pendingReplications.getNumReplicas(blocks[0]));
@ -381,9 +388,9 @@ public class TestPendingReplication {
BlockManagerTestUtil.computeAllPendingWork(bm); BlockManagerTestUtil.computeAllPendingWork(bm);
BlockManagerTestUtil.updateState(bm); BlockManagerTestUtil.updateState(bm);
assertEquals(bm.getPendingReplicationBlocksCount(), 1L); assertEquals(bm.getPendingReplicationBlocksCount(), 1L);
assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock() BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock());
.getLocalBlock()), 2); assertEquals(bm.pendingReplications.getNumReplicas(storedBlock), 2);
// 4. delete the file // 4. delete the file
fs.delete(filePath, true); fs.delete(filePath, true);
// retry at most 10 times, each time sleep for 1s. Note that 10s is much // retry at most 10 times, each time sleep for 1s. Note that 10s is much

View File

@ -58,7 +58,9 @@ public class TestRBWBlockInvalidation {
private static NumberReplicas countReplicas(final FSNamesystem namesystem, private static NumberReplicas countReplicas(final FSNamesystem namesystem,
ExtendedBlock block) { ExtendedBlock block) {
return namesystem.getBlockManager().countNodes(block.getLocalBlock()); final BlockManager blockManager = namesystem.getBlockManager();
return blockManager.countNodes(blockManager.getStoredBlock(
block.getLocalBlock()));
} }
/** /**

View File

@ -830,7 +830,11 @@ public class TestReplicationPolicy {
assertEquals(targets.length, 2); assertEquals(targets.length, 2);
assertTrue(isOnSameRack(targets[0], dataNodes[2])); assertTrue(isOnSameRack(targets[0], dataNodes[2]));
} }
private BlockInfo genBlockInfo(long id) {
return new BlockInfoContiguous(new Block(id), (short) 3);
}
/** /**
* Test for the high priority blocks are processed before the low priority * Test for the high priority blocks are processed before the low priority
* blocks. * blocks.
@ -849,16 +853,17 @@ public class TestReplicationPolicy {
.getNamesystem().getBlockManager().neededReplications; .getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority // Adding the blocks directly to normal priority
neededReplications.add(new Block(ThreadLocalRandom.current()
.nextLong()), 2, 0, 3); neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
nextLong()), 2, 0, 3);
} }
// Lets wait for the replication interval, to start process normal // Lets wait for the replication interval, to start process normal
// priority blocks // priority blocks
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Adding the block directly to high priority list // Adding the block directly to high priority list
neededReplications.add(new Block(ThreadLocalRandom.current().nextLong()), neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
1, 0, 3); nextLong()), 1, 0, 3);
// Lets wait for the replication interval // Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
@ -881,30 +886,31 @@ public class TestReplicationPolicy {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block // Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
.nextLong()), 1, 0, 3); nextLong()), 1, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block // Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
.nextLong()), 2, 0, 7); nextLong()), 2, 0, 7);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
.nextLong()), 6, 0, 6); nextLong()), 6, 0, 6);
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
.nextLong()), 5, 0, 6); nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block // Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
.nextLong()), 0, 0, 3); nextLong()), 0, 0, 3);
} }
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
// from // from
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED. // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6); List<List<BlockInfo>> chosenBlocks =
underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0); assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
@ -914,8 +920,8 @@ public class TestReplicationPolicy {
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0); assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
// Adding QUEUE_HIGHEST_PRIORITY // Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(new Block(ThreadLocalRandom.current().nextLong()), underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
1, 0, 3); nextLong()), 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
@ -933,7 +939,7 @@ public class TestReplicationPolicy {
/** asserts the chosen blocks with expected priority blocks */ /** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks( private void assertTheChosenBlocks(
List<List<Block>> chosenBlocks, int firstPrioritySize, List<List<BlockInfo>> chosenBlocks, int firstPrioritySize,
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
int fifthPrioritySize) { int fifthPrioritySize) {
assertEquals( assertEquals(
@ -1107,9 +1113,9 @@ public class TestReplicationPolicy {
public void testUpdateDoesNotCauseSkippedReplication() { public void testUpdateDoesNotCauseSkippedReplication() {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
Block block1 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
Block block3 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_VERY_UNDER_REPLICATED block // Adding QUEUE_VERY_UNDER_REPLICATED block
final int block1CurReplicas = 2; final int block1CurReplicas = 2;
@ -1123,7 +1129,7 @@ public class TestReplicationPolicy {
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block3, 2, 0, 6); underReplicatedBlocks.add(block3, 2, 0, 6);
List<List<Block>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED. // from QUEUE_VERY_UNDER_REPLICATED.
@ -1156,8 +1162,8 @@ public class TestReplicationPolicy {
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 1, 1);
@ -1165,7 +1171,7 @@ public class TestReplicationPolicy {
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED. // from QUEUE_VERY_UNDER_REPLICATED.
@ -1203,8 +1209,8 @@ public class TestReplicationPolicy {
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 1, 1);
@ -1212,7 +1218,7 @@ public class TestReplicationPolicy {
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED. // from QUEUE_VERY_UNDER_REPLICATED.
@ -1266,8 +1272,8 @@ public class TestReplicationPolicy {
BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong());
Block block2 = new Block(ThreadLocalRandom.current().nextLong()); BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block1, 0, 1, 1); underReplicatedBlocks.add(block1, 0, 1, 1);
@ -1275,7 +1281,7 @@ public class TestReplicationPolicy {
// Adding QUEUE_UNDER_REPLICATED block // Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(block2, 0, 1, 1); underReplicatedBlocks.add(block2, 0, 1, 1);
List<List<Block>> chosenBlocks; List<List<BlockInfo>> chosenBlocks;
// Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
// from QUEUE_VERY_UNDER_REPLICATED. // from QUEUE_VERY_UNDER_REPLICATED.

View File

@ -28,6 +28,10 @@ import static org.junit.Assert.fail;
public class TestUnderReplicatedBlockQueues { public class TestUnderReplicatedBlockQueues {
private BlockInfo genBlockInfo(long id) {
return new BlockInfoContiguous(new Block(id), (short) 3);
}
/** /**
* Test that adding blocks with different replication counts puts them * Test that adding blocks with different replication counts puts them
* into different queues * into different queues
@ -36,11 +40,11 @@ public class TestUnderReplicatedBlockQueues {
@Test @Test
public void testBlockPriorities() throws Throwable { public void testBlockPriorities() throws Throwable {
UnderReplicatedBlocks queues = new UnderReplicatedBlocks(); UnderReplicatedBlocks queues = new UnderReplicatedBlocks();
Block block1 = new Block(1); BlockInfo block1 = genBlockInfo(1);
Block block2 = new Block(2); BlockInfo block2 = genBlockInfo(2);
Block block_very_under_replicated = new Block(3); BlockInfo block_very_under_replicated = genBlockInfo(3);
Block block_corrupt = new Block(4); BlockInfo block_corrupt = genBlockInfo(4);
Block block_corrupt_repl_one = new Block(5); BlockInfo block_corrupt_repl_one = genBlockInfo(5);
//add a block with a single entry //add a block with a single entry
assertAdded(queues, block1, 1, 0, 3); assertAdded(queues, block1, 1, 0, 3);
@ -82,7 +86,7 @@ public class TestUnderReplicatedBlockQueues {
} }
private void assertAdded(UnderReplicatedBlocks queues, private void assertAdded(UnderReplicatedBlocks queues,
Block block, BlockInfo block,
int curReplicas, int curReplicas,
int decomissionedReplicas, int decomissionedReplicas,
int expectedReplicas) { int expectedReplicas) {

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -81,6 +82,7 @@ public class TestReadOnlySharedStorage {
private DatanodeInfo readOnlyDataNode; private DatanodeInfo readOnlyDataNode;
private Block block; private Block block;
private BlockInfo storedBlock;
private ExtendedBlock extendedBlock; private ExtendedBlock extendedBlock;
@ -132,7 +134,8 @@ public class TestReadOnlySharedStorage {
LocatedBlock locatedBlock = getLocatedBlock(); LocatedBlock locatedBlock = getLocatedBlock();
extendedBlock = locatedBlock.getBlock(); extendedBlock = locatedBlock.getBlock();
block = extendedBlock.getLocalBlock(); block = extendedBlock.getLocalBlock();
storedBlock = blockManager.getStoredBlock(block);
assertThat(locatedBlock.getLocations().length, is(1)); assertThat(locatedBlock.getLocations().length, is(1));
normalDataNode = locatedBlock.getLocations()[0]; normalDataNode = locatedBlock.getLocations()[0];
readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId()); readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId());
@ -188,7 +191,7 @@ public class TestReadOnlySharedStorage {
} }
private void validateNumberReplicas(int expectedReplicas) throws IOException { private void validateNumberReplicas(int expectedReplicas) throws IOException {
NumberReplicas numberReplicas = blockManager.countNodes(block); NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas));
assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.excessReplicas(), is(0));
assertThat(numberReplicas.corruptReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0));
@ -230,7 +233,7 @@ public class TestReadOnlySharedStorage {
cluster.getNameNode(), normalDataNode.getXferAddr()); cluster.getNameNode(), normalDataNode.getXferAddr());
// The live replica count should now be zero (since the NORMAL replica is offline) // The live replica count should now be zero (since the NORMAL replica is offline)
NumberReplicas numberReplicas = blockManager.countNodes(block); NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
assertThat(numberReplicas.liveReplicas(), is(0)); assertThat(numberReplicas.liveReplicas(), is(0));
// The block should be reported as under-replicated // The block should be reported as under-replicated
@ -263,7 +266,7 @@ public class TestReadOnlySharedStorage {
waitForLocations(1); waitForLocations(1);
// However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count
NumberReplicas numberReplicas = blockManager.countNodes(block); NumberReplicas numberReplicas = blockManager.countNodes(storedBlock);
assertThat(numberReplicas.corruptReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0));
} }

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test; import org.junit.Test;
@ -260,7 +261,9 @@ public class TestProcessCorruptBlocks {
} }
private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) {
return namesystem.getBlockManager().countNodes(block.getLocalBlock()); final BlockManager blockManager = namesystem.getBlockManager();
return blockManager.countNodes(blockManager.getStoredBlock(
block.getLocalBlock()));
} }
private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName, private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,