From 6e3fcffe291faec40fa9214f4880a35a952836c4 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Wed, 17 Jun 2015 08:05:44 -0700 Subject: [PATCH] HDFS-8608. Merge HDFS-7912 to trunk and branch-2 (track BlockInfo instead of Block in UnderReplicatedBlocks and PendingReplicationBlocks). Contributed by Zhe Zhang. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 ++ .../server/blockmanagement/BlockManager.java | 54 +++++++------- .../PendingReplicationBlocks.java | 51 +++++++------- .../UnderReplicatedBlocks.java | 57 ++++++++------- .../hdfs/server/namenode/FSDirAttrOp.java | 8 +-- .../hdfs/server/namenode/FSNamesystem.java | 21 +++--- .../hadoop/hdfs/server/namenode/INode.java | 12 ++-- .../hdfs/server/namenode/INodeFile.java | 4 +- .../hdfs/server/namenode/NamenodeFsck.java | 58 +++++++-------- .../hadoop/hdfs/server/namenode/SafeMode.java | 4 +- .../blockmanagement/BlockManagerTestUtil.java | 5 +- .../blockmanagement/TestBlockManager.java | 8 +-- .../server/blockmanagement/TestNodeCount.java | 3 +- .../TestOverReplicatedBlocks.java | 5 +- .../TestPendingReplication.java | 27 ++++--- .../TestRBWBlockInvalidation.java | 4 +- .../TestReplicationPolicy.java | 70 ++++++++++--------- .../TestUnderReplicatedBlockQueues.java | 16 +++-- .../datanode/TestReadOnlySharedStorage.java | 11 +-- .../namenode/TestProcessCorruptBlocks.java | 5 +- 20 files changed, 231 insertions(+), 196 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 07cd4a8edbe..a01446a3926 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -640,6 +640,10 @@ Release 2.8.0 - UNRELEASED HDFS-7164. Feature documentation for HDFS-6581. (Arpit Agarwal) + HDFS-9608. Merge HDFS-7912 to trunk and branch-2 (track BlockInfo instead + of Block in UnderReplicatedBlocks and PendingReplicationBlocks). + (Zhe Zhang via wang) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index ebc9017a111..824801f5340 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -588,7 +588,7 @@ public class BlockManager { /** * @return true if the block has minimum replicas */ - public boolean checkMinReplication(Block block) { + public boolean checkMinReplication(BlockInfo block) { return (countNodes(block).liveReplicas() >= minReplication); } @@ -1310,7 +1310,7 @@ public class BlockManager { * @return number of blocks scheduled for replication during this iteration. */ int computeReplicationWork(int blocksToProcess) { - List> blocksToReplicate = null; + List> blocksToReplicate = null; namesystem.writeLock(); try { // Choose the blocks to be replicated @@ -1328,7 +1328,7 @@ public class BlockManager { * @return the number of blocks scheduled for replication */ @VisibleForTesting - int computeReplicationWorkForBlocks(List> blocksToReplicate) { + int computeReplicationWorkForBlocks(List> blocksToReplicate) { int requiredReplication, numEffectiveReplicas; List containingNodes; DatanodeDescriptor srcNode; @@ -1342,7 +1342,7 @@ public class BlockManager { try { synchronized (neededReplications) { for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (Block block : blocksToReplicate.get(priority)) { + for (BlockInfo block : blocksToReplicate.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append @@ -1426,7 +1426,7 @@ public class BlockManager { } synchronized (neededReplications) { - Block block = rw.block; + BlockInfo block = rw.block; int priority = rw.priority; // Recheck since global lock was released // block should belong to a file @@ -1688,7 +1688,7 @@ public class BlockManager { * and put them back into the neededReplication queue */ private void processPendingReplications() { - Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); + BlockInfo[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); try { @@ -2895,13 +2895,13 @@ public class BlockManager { /** Set replication for the blocks. */ public void setReplication(final short oldRepl, final short newRepl, - final String src, final Block... blocks) { + final String src, final BlockInfo... blocks) { if (newRepl == oldRepl) { return; } // update needReplication priority queues - for(Block b : blocks) { + for(BlockInfo b : blocks) { updateNeededReplications(b, 0, newRepl-oldRepl); } @@ -2909,7 +2909,7 @@ public class BlockManager { // old replication > the new one; need to remove copies LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl + " for " + src); - for(Block b : blocks) { + for(BlockInfo b : blocks) { processOverReplicatedBlock(b, newRepl, null, null); } } else { // replication factor is increased @@ -3092,7 +3092,8 @@ public class BlockManager { blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); assert (namesystem.hasWriteLock()); { - if (!blocksMap.removeNode(block, node)) { + BlockInfo storedBlock = getStoredBlock(block); + if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + " removed from node {}", block, node); return; @@ -3106,8 +3107,8 @@ public class BlockManager { // BlockCollection bc = blocksMap.getBlockCollection(block); if (bc != null) { - namesystem.decrementSafeBlockCount(block); - updateNeededReplications(block, -1, 0); + namesystem.decrementSafeBlockCount(storedBlock); + updateNeededReplications(storedBlock, -1, 0); } // @@ -3181,7 +3182,10 @@ public class BlockManager { // // Modify the blocks->datanode map and node's map. // - pendingReplications.decrement(block, node); + BlockInfo storedBlock = getStoredBlock(block); + if (storedBlock != null) { + pendingReplications.decrement(getStoredBlock(block), node); + } processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } @@ -3293,7 +3297,7 @@ public class BlockManager { * Return the number of nodes hosting a given block, grouped * by the state of those replicas. */ - public NumberReplicas countNodes(Block b) { + public NumberReplicas countNodes(BlockInfo b) { int decommissioned = 0; int decommissioning = 0; int live = 0; @@ -3326,12 +3330,12 @@ public class BlockManager { } /** - * Simpler, faster form of {@link #countNodes(Block)} that only returns the number + * Simpler, faster form of {@link #countNodes} that only returns the number * of live nodes. If in startup safemode (or its 30-sec extension period), * then it gains speed by ignoring issues of excess replicas or nodes * that are decommissioned or in process of becoming decommissioned. - * If not in startup, then it calls {@link #countNodes(Block)} instead. - * + * If not in startup, then it calls {@link #countNodes} instead. + * * @param b - the block being tested * @return count of live nodes for this block */ @@ -3360,10 +3364,10 @@ public class BlockManager { if (!namesystem.isPopulatingReplQueues()) { return; } - final Iterator it = srcNode.getBlockIterator(); + final Iterator it = srcNode.getBlockIterator(); int numOverReplicated = 0; while(it.hasNext()) { - final Block block = it.next(); + final BlockInfo block = it.next(); BlockCollection bc = blocksMap.getBlockCollection(block); short expectedReplication = bc.getPreferredBlockReplication(); NumberReplicas num = countNodes(block); @@ -3427,7 +3431,7 @@ public class BlockManager { return blocksMap.size(); } - public void removeBlock(Block block) { + public void removeBlock(BlockInfo block) { assert namesystem.hasWriteLock(); // No need to ACK blocks that are being removed entirely // from the namespace, since the removal of the associated @@ -3448,7 +3452,7 @@ public class BlockManager { } /** updates a block in under replication queue */ - private void updateNeededReplications(final Block block, + private void updateNeededReplications(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { namesystem.writeLock(); try { @@ -3480,7 +3484,7 @@ public class BlockManager { */ public void checkReplication(BlockCollection bc) { final short expected = bc.getPreferredBlockReplication(); - for (Block block : bc.getBlocks()) { + for (BlockInfo block : bc.getBlocks()) { final NumberReplicas n = countNodes(block); if (isNeededReplication(block, expected, n.liveReplicas())) { neededReplications.add(block, n.liveReplicas(), @@ -3682,7 +3686,7 @@ public class BlockManager { /** * Return an iterator over the set of blocks for which there are no replicas. */ - public Iterator getCorruptReplicaBlockIterator() { + public Iterator getCorruptReplicaBlockIterator() { return neededReplications.iterator( UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); } @@ -3807,7 +3811,7 @@ public class BlockManager { private static class ReplicationWork { - private final Block block; + private final BlockInfo block; private final BlockCollection bc; private final DatanodeDescriptor srcNode; @@ -3818,7 +3822,7 @@ public class BlockManager { private DatanodeStorageInfo targets[]; private final int priority; - public ReplicationWork(Block block, + public ReplicationWork(BlockInfo block, BlockCollection bc, DatanodeDescriptor srcNode, List containingNodes, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java index 796b878c92d..04232cf95e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java @@ -23,6 +23,7 @@ import java.io.PrintWriter; import java.sql.Time; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -46,8 +47,8 @@ import org.slf4j.Logger; class PendingReplicationBlocks { private static final Logger LOG = BlockManager.LOG; - private final Map pendingReplications; - private final ArrayList timedOutItems; + private final Map pendingReplications; + private final ArrayList timedOutItems; Daemon timerThread = null; private volatile boolean fsRunning = true; @@ -62,8 +63,8 @@ class PendingReplicationBlocks { if ( timeoutPeriod > 0 ) { this.timeout = timeoutPeriod; } - pendingReplications = new HashMap(); - timedOutItems = new ArrayList(); + pendingReplications = new HashMap<>(); + timedOutItems = new ArrayList<>(); } void start() { @@ -76,7 +77,7 @@ class PendingReplicationBlocks { * @param block The corresponding block * @param targets The DataNodes where replicas of the block should be placed */ - void increment(Block block, DatanodeDescriptor[] targets) { + void increment(BlockInfo block, DatanodeDescriptor[] targets) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { @@ -93,9 +94,9 @@ class PendingReplicationBlocks { * Decrement the number of pending replication requests * for this block. * - * @param The DataNode that finishes the replication + * @param dn The DataNode that finishes the replication */ - void decrement(Block block, DatanodeDescriptor dn) { + void decrement(BlockInfo block, DatanodeDescriptor dn) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -115,7 +116,7 @@ class PendingReplicationBlocks { * @param block The given block whose pending replication requests need to be * removed */ - void remove(Block block) { + void remove(BlockInfo block) { synchronized (pendingReplications) { pendingReplications.remove(block); } @@ -138,7 +139,7 @@ class PendingReplicationBlocks { /** * How many copies of this block is pending replication? */ - int getNumReplicas(Block block) { + int getNumReplicas(BlockInfo block) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -153,13 +154,13 @@ class PendingReplicationBlocks { * replication requests. Returns null if no blocks have * timed out. */ - Block[] getTimedOutBlocks() { + BlockInfo[] getTimedOutBlocks() { synchronized (timedOutItems) { if (timedOutItems.size() <= 0) { return null; } - Block[] blockList = timedOutItems.toArray( - new Block[timedOutItems.size()]); + BlockInfo[] blockList = timedOutItems.toArray( + new BlockInfo[timedOutItems.size()]); timedOutItems.clear(); return blockList; } @@ -179,7 +180,7 @@ class PendingReplicationBlocks { PendingBlockInfo(DatanodeDescriptor[] targets) { this.timeStamp = monotonicNow(); this.targets = targets == null ? new ArrayList() - : new ArrayList(Arrays.asList(targets)); + : new ArrayList<>(Arrays.asList(targets)); } long getTimeStamp() { @@ -192,9 +193,7 @@ class PendingReplicationBlocks { void incrementReplicas(DatanodeDescriptor... newTargets) { if (newTargets != null) { - for (DatanodeDescriptor dn : newTargets) { - targets.add(dn); - } + Collections.addAll(targets, newTargets); } } @@ -232,17 +231,17 @@ class PendingReplicationBlocks { */ void pendingReplicationCheck() { synchronized (pendingReplications) { - Iterator> iter = + Iterator> iter = pendingReplications.entrySet().iterator(); long now = monotonicNow(); if(LOG.isDebugEnabled()) { LOG.debug("PendingReplicationMonitor checking Q"); } while (iter.hasNext()) { - Map.Entry entry = iter.next(); + Map.Entry entry = iter.next(); PendingBlockInfo pendingBlock = entry.getValue(); if (now > pendingBlock.getTimeStamp() + timeout) { - Block block = entry.getKey(); + BlockInfo block = entry.getKey(); synchronized (timedOutItems) { timedOutItems.add(block); } @@ -275,16 +274,14 @@ class PendingReplicationBlocks { synchronized (pendingReplications) { out.println("Metasave: Blocks being replicated: " + pendingReplications.size()); - Iterator> iter = - pendingReplications.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry entry = iter.next(); + for (Map.Entry entry : + pendingReplications.entrySet()) { PendingBlockInfo pendingBlock = entry.getValue(); Block block = entry.getKey(); - out.println(block + - " StartTime: " + new Time(pendingBlock.timeStamp) + - " NumReplicaInProgress: " + - pendingBlock.getNumReplicas()); + out.println(block + + " StartTime: " + new Time(pendingBlock.timeStamp) + + " NumReplicaInProgress: " + + pendingBlock.getNumReplicas()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 1daa0ee2bc7..000416e1754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -35,7 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * *

* The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(Block, int, int, int)}. + * is implemented in {@link #getPriority(int, int, int)}. *

*

The queue order is as follows:

*
    @@ -62,7 +61,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * blocks that are not corrupt higher priority. *
*/ -class UnderReplicatedBlocks implements Iterable { +class UnderReplicatedBlocks implements Iterable { /** The total number of queues : {@value} */ static final int LEVEL = 5; /** The queue with the highest priority: {@value} */ @@ -78,8 +77,8 @@ class UnderReplicatedBlocks implements Iterable { /** The queue for corrupt blocks: {@value} */ static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; /** the queues themselves */ - private final List> priorityQueues - = new ArrayList>(LEVEL); + private final List> priorityQueues + = new ArrayList<>(LEVEL); /** The number of corrupt blocks with replication factor 1 */ private int corruptReplOneBlocks = 0; @@ -87,7 +86,7 @@ class UnderReplicatedBlocks implements Iterable { /** Create an object. */ UnderReplicatedBlocks() { for (int i = 0; i < LEVEL; i++) { - priorityQueues.add(new LightWeightLinkedSet()); + priorityQueues.add(new LightWeightLinkedSet()); } } @@ -131,8 +130,8 @@ class UnderReplicatedBlocks implements Iterable { } /** Check if a block is in the neededReplication queue */ - synchronized boolean contains(Block block) { - for(LightWeightLinkedSet set : priorityQueues) { + synchronized boolean contains(BlockInfo block) { + for(LightWeightLinkedSet set : priorityQueues) { if (set.contains(block)) { return true; } @@ -141,13 +140,11 @@ class UnderReplicatedBlocks implements Iterable { } /** Return the priority of a block - * @param block a under replicated block * @param curReplicas current number of replicas of the block * @param expectedReplicas expected number of replicas of the block * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) */ - private int getPriority(Block block, - int curReplicas, + private int getPriority(int curReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; @@ -183,12 +180,12 @@ class UnderReplicatedBlocks implements Iterable { * @param expectedReplicas expected number of replicas of the block * @return true if the block was added to a queue. */ - synchronized boolean add(Block block, - int curReplicas, + synchronized boolean add(BlockInfo block, + int curReplicas, int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(block, curReplicas, decomissionedReplicas, + int priLevel = getPriority(curReplicas, decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && @@ -207,11 +204,11 @@ class UnderReplicatedBlocks implements Iterable { } /** remove a block from a under replication queue */ - synchronized boolean remove(Block block, - int oldReplicas, + synchronized boolean remove(BlockInfo block, + int oldReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(block, oldReplicas, + int priLevel = getPriority(oldReplicas, decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); @@ -241,8 +238,8 @@ class UnderReplicatedBlocks implements Iterable { * @param priLevel expected privilege level * @return true if the block was found and removed from one of the priority queues */ - boolean remove(Block block, int priLevel) { - if(priLevel >= 0 && priLevel < LEVEL + boolean remove(BlockInfo block, int priLevel) { + if(priLevel >= 0 && priLevel < LEVEL && priorityQueues.get(priLevel).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" + @@ -279,14 +276,16 @@ class UnderReplicatedBlocks implements Iterable { * @param curReplicasDelta the change in the replicate count from before * @param expectedReplicasDelta the change in the expected replica count from before */ - synchronized void update(Block block, int curReplicas, + synchronized void update(BlockInfo block, int curReplicas, int decommissionedReplicas, int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(block, curReplicas, decommissionedReplicas, curExpectedReplicas); - int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, oldExpectedReplicas); + int curPri = getPriority(curReplicas, decommissionedReplicas, + curExpectedReplicas); + int oldPri = getPriority(oldReplicas, decommissionedReplicas, + oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + block + @@ -336,12 +335,12 @@ class UnderReplicatedBlocks implements Iterable { * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ - public synchronized List> chooseUnderReplicatedBlocks( + public synchronized List> chooseUnderReplicatedBlocks( int blocksToProcess) { // initialize data structure for the return value - List> blocksToReplicate = new ArrayList>(LEVEL); + List> blocksToReplicate = new ArrayList<>(LEVEL); for (int i = 0; i < LEVEL; i++) { - blocksToReplicate.add(new ArrayList()); + blocksToReplicate.add(new ArrayList()); } if (size() == 0) { // There are no blocks to collect. @@ -364,7 +363,7 @@ class UnderReplicatedBlocks implements Iterable { // Loop through all remaining blocks in the list. while (blockCount < blocksToProcess && neededReplicationsIterator.hasNext()) { - Block block = neededReplicationsIterator.next(); + BlockInfo block = neededReplicationsIterator.next(); blocksToReplicate.get(priority).add(block); blockCount++; } @@ -396,10 +395,10 @@ class UnderReplicatedBlocks implements Iterable { /** * An iterator over blocks. */ - class BlockIterator implements Iterator { + class BlockIterator implements Iterator { private int level; private boolean isIteratorForLevel = false; - private final List> iterators = new ArrayList>(); + private final List> iterators = new ArrayList<>(); /** * Construct an iterator over all queues. @@ -431,7 +430,7 @@ class UnderReplicatedBlocks implements Iterable { } @Override - public Block next() { + public BlockInfo next() { if (isIteratorForLevel) { return iterators.get(0).next(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index 879738d2f79..3b07320f8de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -25,12 +25,12 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.util.EnumCounters; @@ -148,8 +148,8 @@ public class FSDirAttrOp { } final short[] blockRepls = new short[2]; // 0: old, 1: new - final Block[] blocks = unprotectedSetReplication(fsd, src, replication, - blockRepls); + final BlockInfo[] blocks = unprotectedSetReplication(fsd, src, + replication, blockRepls); isFile = blocks != null; if (isFile) { fsd.getEditLog().logSetReplication(src, replication); @@ -375,7 +375,7 @@ public class FSDirAttrOp { } } - static Block[] unprotectedSetReplication( + static BlockInfo[] unprotectedSetReplication( FSDirectory fsd, String src, short replication, short[] blockRepls) throws QuotaExceededException, UnresolvedLinkException, SnapshotAccessControlException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 40c71eadef8..b97776a91f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3170,8 +3170,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * of blocks that need to be removed from blocksMap */ void removeBlocks(BlocksMapUpdateInfo blocks) { - List toDeleteList = blocks.getToDeleteList(); - Iterator iter = toDeleteList.iterator(); + List toDeleteList = blocks.getToDeleteList(); + Iterator iter = toDeleteList.iterator(); while (iter.hasNext()) { writeLock(); try { @@ -3227,12 +3227,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean trackBlockCounts = isSafeModeTrackingBlocks(); int numRemovedComplete = 0, numRemovedSafe = 0; - for (Block b : blocks.getToDeleteList()) { + for (BlockInfo b : blocks.getToDeleteList()) { if (trackBlockCounts) { - BlockInfo bi = getStoredBlock(b); - if (bi.isComplete()) { + if (b.isComplete()) { numRemovedComplete++; - if (bi.numNodes() >= blockManager.minReplication) { + if (blockManager.checkMinReplication(b)) { numRemovedSafe++; } } @@ -4151,7 +4150,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, boolean changed = false; writeLock(); try { - final Iterator it = blockManager.getCorruptReplicaBlockIterator(); + final Iterator it = + blockManager.getCorruptReplicaBlockIterator(); while (it.hasNext()) { Block b = it.next(); @@ -5093,7 +5093,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } @Override - public void decrementSafeBlockCount(Block b) { + public void decrementSafeBlockCount(BlockInfo b) { // safeMode is volatile, and may be set to null at any time SafeModeInfo safeMode = this.safeMode; if (safeMode == null) // mostly true @@ -5918,7 +5918,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } // print a limited # of corrupt files per call - final Iterator blkIterator = blockManager.getCorruptReplicaBlockIterator(); + final Iterator blkIterator = + blockManager.getCorruptReplicaBlockIterator(); int skip = getIntCookie(cookieTab[0]); for (int i = 0; i < skip && blkIterator.hasNext(); i++) { @@ -5926,7 +5927,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } while (blkIterator.hasNext()) { - Block blk = blkIterator.next(); + BlockInfo blk = blkIterator.next(); final INode inode = (INode)blockManager.getBlockCollection(blk); skip++; if (inode != null && blockManager.countNodes(blk).liveReplicas() == 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java index cf38fa50c87..8a2d17ab9a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java @@ -34,9 +34,9 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.DFSUtil; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.QuotaExceededException; import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference; import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName; @@ -950,8 +950,8 @@ public abstract class INode implements INodeAttributes, Diff.Element { /** * The list of blocks that need to be removed from blocksMap */ - private final List toDeleteList; - + private final List toDeleteList; + public BlocksMapUpdateInfo() { toDeleteList = new ChunkedArrayList<>(); } @@ -959,7 +959,7 @@ public abstract class INode implements INodeAttributes, Diff.Element { /** * @return The list of blocks that need to be removed from blocksMap */ - public List getToDeleteList() { + public List getToDeleteList() { return toDeleteList; } @@ -968,12 +968,12 @@ public abstract class INode implements INodeAttributes, Diff.Element { * {@link BlocksMapUpdateInfo#toDeleteList} * @param toDelete the to-be-deleted block */ - public void addDeleteBlock(Block toDelete) { + public void addDeleteBlock(BlockInfo toDelete) { assert toDelete != null : "toDelete is null"; toDeleteList.add(toDelete); } - public void removeDeleteBlock(Block block) { + public void removeDeleteBlock(BlockInfo block) { assert block != null : "block is null"; toDeleteList.remove(block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java index 4590eecb736..48879d7ee9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java @@ -904,8 +904,8 @@ public class INodeFile extends INodeWithAdditionalFields getDiffs().findEarlierSnapshotBlocks(snapshotId); if(snapshotBlocks == null) return; - List toDelete = collectedBlocks.getToDeleteList(); - for(Block blk : snapshotBlocks) { + List toDelete = collectedBlocks.getToDeleteList(); + for(BlockInfo blk : snapshotBlocks) { if(toDelete.contains(blk)) collectedBlocks.removeDeleteBlock(blk); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 39180b10d2e..f67d25a0d06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -109,7 +109,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private public class NamenodeFsck implements DataEncryptionKeyFactory { public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); - + // return string marking fsck status public static final String CORRUPT_STATUS = "is CORRUPT"; public static final String HEALTHY_STATUS = "is HEALTHY"; @@ -117,7 +117,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { public static final String DECOMMISSIONED_STATUS = "is DECOMMISSIONED"; public static final String NONEXISTENT_STATUS = "does not exist"; public static final String FAILURE_STATUS = "FAILED"; - + private final NameNode namenode; private final NetworkTopology networktopology; private final int totalDatanodes; @@ -143,14 +143,14 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { */ private boolean internalError = false; - /** + /** * True if the user specified the -move option. * * Whe this option is in effect, we will copy salvaged blocks into the lost * and found. */ private boolean doMove = false; - /** + /** * True if the user specified the -delete option. * * Whe this option is in effect, we will delete corrupted files. @@ -183,7 +183,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { * @param remoteAddress source address of the fsck request */ NamenodeFsck(Configuration conf, NameNode namenode, - NetworkTopology networktopology, + NetworkTopology networktopology, Map pmap, PrintWriter out, int totalDatanodes, InetAddress remoteAddress) { this.conf = conf; @@ -199,7 +199,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { this.staleInterval = conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); - + for (Iterator it = pmap.keySet().iterator(); it.hasNext();) { String key = it.next(); if (key.equals("path")) { this.path = pmap.get("path")[0]; } @@ -251,7 +251,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } BlockCollection bc = bm.getBlockCollection(blockInfo); INode iNode = (INode) bc; - NumberReplicas numberReplicas= bm.countNodes(block); + NumberReplicas numberReplicas= bm.countNodes(blockInfo); out.println("Block Id: " + blockId); out.println("Block belongs to: "+iNode.getFullPathName()); out.println("No. of Expected Replica: " + @@ -350,7 +350,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { listCorruptFileBlocks(); return; } - + if (this.showStoragePolcies) { storageTypeSummary = new StoragePolicySummary( namenode.getNamesystem().getBlockManager().getStoragePolicies()); @@ -380,7 +380,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { // DFSck client scans for the string HEALTHY/CORRUPT to check the status // of file system and return appropriate code. Changing the output - // string might break testcases. Also note this must be the last line + // string might break testcases. Also note this must be the last line // of the report. if (res.isHealthy()) { out.print("\n\nThe filesystem under path '" + path + "' " + HEALTHY_STATUS); @@ -423,7 +423,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { + " CORRUPT files"); out.println(); } - + @VisibleForTesting void check(String parent, HdfsFileStatus file, Result res) throws IOException { String path = file.getFullName(parent); @@ -480,7 +480,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { throws IOException { long fileLen = file.getLen(); LocatedBlocks blocks = null; - FSNamesystem fsn = namenode.getNamesystem(); + final FSNamesystem fsn = namenode.getNamesystem(); fsn.readLock(); try { blocks = fsn.getBlockLocations( @@ -539,8 +539,10 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { ExtendedBlock block = lBlk.getBlock(); BlockManager bm = namenode.getNamesystem().getBlockManager(); + final BlockInfo storedBlock = bm.getStoredBlock( + block.getLocalBlock()); // count decommissionedReplicas / decommissioningReplicas - NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock()); + NumberReplicas numberReplicas = bm.countNodes(storedBlock); int decommissionedReplicas = numberReplicas.decommissioned();; int decommissioningReplicas = numberReplicas.decommissioning(); res.decommissionedReplicas += decommissionedReplicas; @@ -608,7 +610,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { out.println(); out.print(path + ": "); } - out.println(" Replica placement policy is violated for " + + out.println(" Replica placement policy is violated for " + block + ". " + blockPlacementStatus.getErrorDescription()); } @@ -743,7 +745,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { return false; } } - + private void copyBlocksToLostFound(String parent, HdfsFileStatus file, LocatedBlocks blocks) throws IOException { final DFSClient dfs = new DFSClient(NameNode.getAddress(conf), conf); @@ -784,7 +786,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { fos = dfs.create(target + "/" + chain, true); chain++; } - + // copy the block. It's a pity it's not abstracted from DFSInputStream ... try { copyBlock(dfs, lblock, fos); @@ -802,7 +804,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { LOG.warn("Fsck: there were errors copying the remains of the " + "corrupted file " + fullName + " to /lost+found"); } else { - LOG.info("Fsck: copied the remains of the corrupted file " + + LOG.info("Fsck: copied the remains of the corrupted file " + fullName + " to /lost+found"); } } catch (Exception e) { @@ -813,7 +815,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { dfs.close(); } } - + /* * XXX (ab) Bulk of this method is copied verbatim from {@link DFSClient}, which is * bad. Both places should be refactored to provide a method to copy blocks @@ -824,12 +826,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { int failures = 0; InetSocketAddress targetAddr = null; TreeSet deadNodes = new TreeSet(); - BlockReader blockReader = null; - ExtendedBlock block = lblock.getBlock(); + BlockReader blockReader = null; + ExtendedBlock block = lblock.getBlock(); while (blockReader == null) { DatanodeInfo chosenNode; - + try { chosenNode = bestNode(dfs, lblock.getLocations(), deadNodes); targetAddr = NetUtils.createSocketAddr(chosenNode.getXferAddr()); @@ -900,7 +902,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { bytesRead += cnt; } if ( bytesRead != block.getNumBytes() ) { - throw new IOException("Recorded block size is " + block.getNumBytes() + + throw new IOException("Recorded block size is " + block.getNumBytes() + ", but datanode returned " +bytesRead+" bytes"); } } catch (Exception e) { @@ -937,12 +939,12 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { } while (deadNodes.contains(chosenNode)); return chosenNode; } - + private void lostFoundInit(DFSClient dfs) { lfInited = true; try { String lfName = "/lost+found"; - + final HdfsFileStatus lfStatus = dfs.getFileInfo(lfName); if (lfStatus == null) { // not exists lfInitedOk = dfs.mkdirs(lfName, null, true); @@ -997,21 +999,21 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { final short replication; final int minReplication; - + Result(Configuration conf) { - this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT); this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT); } - + /** * DFS is considered healthy if there are no missing blocks. */ boolean isHealthy() { return ((missingIds.size() == 0) && (corruptBlocks == 0)); } - + /** Add a missing block name, plus its size. */ void addMissing(String id, long size) { missingIds.add(id); @@ -1030,7 +1032,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { return 0.0f; return (float) (totalReplicas) / (float) totalBlocks; } - + @Override public String toString() { StringBuilder res = new StringBuilder(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java index 95fc06bd8ce..14284821aa3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeMode.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; /** SafeMode related operations. */ @InterfaceAudience.Private @@ -49,5 +49,5 @@ public interface SafeMode { public void incrementSafeBlockCount(int replication); /** Decrement number of blocks that reached minimal replication. */ - public void decrementSafeBlockCount(Block b); + public void decrementSafeBlockCount(BlockInfo b); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 23e610f8415..148135bae97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -69,9 +69,10 @@ public class BlockManagerTestUtil { final BlockManager bm = namesystem.getBlockManager(); namesystem.readLock(); try { + final BlockInfo storedBlock = bm.getStoredBlock(b); return new int[]{getNumberOfRacks(bm, b), - bm.countNodes(b).liveReplicas(), - bm.neededReplications.contains(b) ? 1 : 0}; + bm.countNodes(storedBlock).liveReplicas(), + bm.neededReplications.contains(storedBlock) ? 1 : 0}; } finally { namesystem.readUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 5a82b15635f..396dff302a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -440,14 +440,14 @@ public class TestBlockManager { return blockInfo; } - private DatanodeStorageInfo[] scheduleSingleReplication(Block block) { + private DatanodeStorageInfo[] scheduleSingleReplication(BlockInfo block) { // list for priority 1 - List list_p1 = new ArrayList(); + List list_p1 = new ArrayList<>(); list_p1.add(block); // list of lists for each priority - List> list_all = new ArrayList>(); - list_all.add(new ArrayList()); // for priority 0 + List> list_all = new ArrayList<>(); + list_all.add(new ArrayList()); // for priority 0 list_all.add(list_p1); // for priority 1 assertEquals("Block not initially pending replication", 0, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index c3726f2a543..1c3f075d5f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -166,10 +166,11 @@ public class TestNodeCount { /* threadsafe read of the replication counts for this block */ NumberReplicas countNodes(Block block, FSNamesystem namesystem) { + BlockManager blockManager = namesystem.getBlockManager(); namesystem.readLock(); try { lastBlock = block; - lastNum = namesystem.getBlockManager().countNodes(block); + lastNum = blockManager.countNodes(blockManager.getStoredBlock(block)); return lastNum; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index a86b57347f3..2d7bb440d0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -117,7 +117,8 @@ public class TestOverReplicatedBlocks { // corrupt one won't be chosen to be excess one // without 4910 the number of live replicas would be 0: block gets lost - assertEquals(1, bm.countNodes(block.getLocalBlock()).liveReplicas()); + assertEquals(1, bm.countNodes( + bm.getStoredBlock(block.getLocalBlock())).liveReplicas()); } } finally { namesystem.writeUnlock(); @@ -219,7 +220,7 @@ public class TestOverReplicatedBlocks { out.close(); ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, p); assertEquals("Expected only one live replica for the block", 1, bm - .countNodes(block.getLocalBlock()).liveReplicas()); + .countNodes(bm.getStoredBlock(block.getLocalBlock())).liveReplicas()); } finally { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index 090c6e89fc4..dae23d85844 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -53,6 +53,12 @@ public class TestPendingReplication { private static final int DFS_REPLICATION_INTERVAL = 1; // Number of datanodes in the cluster private static final int DATANODE_COUNT = 5; + + private BlockInfo genBlockInfo(long id, long length, long gs) { + return new BlockInfoContiguous(new Block(id, length, gs), + (short) DATANODE_COUNT); + } + @Test public void testPendingReplication() { PendingReplicationBlocks pendingReplications; @@ -63,7 +69,7 @@ public class TestPendingReplication { // DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos(10); for (int i = 0; i < storages.length; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); DatanodeStorageInfo[] targets = new DatanodeStorageInfo[i]; System.arraycopy(storages, 0, targets, 0, i); pendingReplications.increment(block, @@ -76,7 +82,7 @@ public class TestPendingReplication { // // remove one item and reinsert it // - Block blk = new Block(8, 8, 0); + BlockInfo blk = genBlockInfo(8, 8, 0); pendingReplications.decrement(blk, storages[7].getDatanodeDescriptor()); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); @@ -96,7 +102,7 @@ public class TestPendingReplication { // are sane. // for (int i = 0; i < 10; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); int numReplicas = pendingReplications.getNumReplicas(block); assertTrue(numReplicas == i); } @@ -115,7 +121,7 @@ public class TestPendingReplication { } for (int i = 10; i < 15; i++) { - Block block = new Block(i, i, 0); + BlockInfo block = genBlockInfo(i, i, 0); pendingReplications.increment(block, DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(i))); @@ -180,7 +186,7 @@ public class TestPendingReplication { block = new Block(1, 1, 0); blockInfo = new BlockInfoContiguous(block, (short) 3); - pendingReplications.increment(block, + pendingReplications.increment(blockInfo, DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(1))); BlockCollection bc = Mockito.mock(BlockCollection.class); @@ -195,7 +201,8 @@ public class TestPendingReplication { // Add a second block to pendingReplications that has no // corresponding entry in blocksmap block = new Block(2, 2, 0); - pendingReplications.increment(block, + blockInfo = new BlockInfoContiguous(block, (short) 3); + pendingReplications.increment(blockInfo, DatanodeStorageInfo.toDatanodeDescriptors( DFSTestUtil.createDatanodeStorageInfos(1))); @@ -275,7 +282,7 @@ public class TestPendingReplication { assertEquals(1, blkManager.pendingReplications.size()); INodeFile fileNode = fsn.getFSDirectory().getINode4Write(file).asFile(); - Block[] blocks = fileNode.getBlocks(); + BlockInfo[] blocks = fileNode.getBlocks(); assertEquals(DATANODE_COUNT - 1, blkManager.pendingReplications.getNumReplicas(blocks[0])); @@ -381,9 +388,9 @@ public class TestPendingReplication { BlockManagerTestUtil.computeAllPendingWork(bm); BlockManagerTestUtil.updateState(bm); assertEquals(bm.getPendingReplicationBlocksCount(), 1L); - assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock() - .getLocalBlock()), 2); - + BlockInfo storedBlock = bm.getStoredBlock(block.getBlock().getLocalBlock()); + assertEquals(bm.pendingReplications.getNumReplicas(storedBlock), 2); + // 4. delete the file fs.delete(filePath, true); // retry at most 10 times, each time sleep for 1s. Note that 10s is much diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java index 728934d65d1..1a32892a943 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestRBWBlockInvalidation.java @@ -58,7 +58,9 @@ public class TestRBWBlockInvalidation { private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { - return namesystem.getBlockManager().countNodes(block.getLocalBlock()); + final BlockManager blockManager = namesystem.getBlockManager(); + return blockManager.countNodes(blockManager.getStoredBlock( + block.getLocalBlock())); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 6e98538b55d..28129572370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -830,7 +830,11 @@ public class TestReplicationPolicy { assertEquals(targets.length, 2); assertTrue(isOnSameRack(targets[0], dataNodes[2])); } - + + private BlockInfo genBlockInfo(long id) { + return new BlockInfoContiguous(new Block(id), (short) 3); + } + /** * Test for the high priority blocks are processed before the low priority * blocks. @@ -849,16 +853,17 @@ public class TestReplicationPolicy { .getNamesystem().getBlockManager().neededReplications; for (int i = 0; i < 100; i++) { // Adding the blocks directly to normal priority - neededReplications.add(new Block(ThreadLocalRandom.current() - .nextLong()), 2, 0, 3); + + neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 2, 0, 3); } // Lets wait for the replication interval, to start process normal // priority blocks Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); // Adding the block directly to high priority list - neededReplications.add(new Block(ThreadLocalRandom.current().nextLong()), - 1, 0, 3); + neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 1, 0, 3); // Lets wait for the replication interval Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); @@ -881,30 +886,31 @@ public class TestReplicationPolicy { for (int i = 0; i < 5; i++) { // Adding QUEUE_HIGHEST_PRIORITY block - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() - .nextLong()), 1, 0, 3); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 1, 0, 3); // Adding QUEUE_VERY_UNDER_REPLICATED block - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() - .nextLong()), 2, 0, 7); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 2, 0, 7); // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() - .nextLong()), 6, 0, 6); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 6, 0, 6); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() - .nextLong()), 5, 0, 6); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 5, 0, 6); // Adding QUEUE_WITH_CORRUPT_BLOCKS block - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current() - .nextLong()), 0, 0, 3); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 0, 0, 3); } // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks // from // QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED. - List> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6); + List> chosenBlocks = + underReplicatedBlocks.chooseUnderReplicatedBlocks(6); assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0); // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from @@ -914,8 +920,8 @@ public class TestReplicationPolicy { assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0); // Adding QUEUE_HIGHEST_PRIORITY - underReplicatedBlocks.add(new Block(ThreadLocalRandom.current().nextLong()), - 1, 0, 3); + underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). + nextLong()), 1, 0, 3); // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED @@ -933,7 +939,7 @@ public class TestReplicationPolicy { /** asserts the chosen blocks with expected priority blocks */ private void assertTheChosenBlocks( - List> chosenBlocks, int firstPrioritySize, + List> chosenBlocks, int firstPrioritySize, int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, int fifthPrioritySize) { assertEquals( @@ -1107,9 +1113,9 @@ public class TestReplicationPolicy { public void testUpdateDoesNotCauseSkippedReplication() { UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks(); - Block block1 = new Block(ThreadLocalRandom.current().nextLong()); - Block block2 = new Block(ThreadLocalRandom.current().nextLong()); - Block block3 = new Block(ThreadLocalRandom.current().nextLong()); + BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + BlockInfo block3 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_VERY_UNDER_REPLICATED block final int block1CurReplicas = 2; @@ -1123,7 +1129,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block3, 2, 0, 6); - List> chosenBlocks; + List> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1156,8 +1162,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(ThreadLocalRandom.current().nextLong()); - Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1165,7 +1171,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List> chosenBlocks; + List> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1203,8 +1209,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(ThreadLocalRandom.current().nextLong()); - Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1212,7 +1218,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List> chosenBlocks; + List> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. @@ -1266,8 +1272,8 @@ public class TestReplicationPolicy { BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration()); UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications; - Block block1 = new Block(ThreadLocalRandom.current().nextLong()); - Block block2 = new Block(ThreadLocalRandom.current().nextLong()); + BlockInfo block1 = genBlockInfo(ThreadLocalRandom.current().nextLong()); + BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block1, 0, 1, 1); @@ -1275,7 +1281,7 @@ public class TestReplicationPolicy { // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(block2, 0, 1, 1); - List> chosenBlocks; + List> chosenBlocks; // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block // from QUEUE_VERY_UNDER_REPLICATED. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index e87a043762c..de36e077aea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -28,6 +28,10 @@ import static org.junit.Assert.fail; public class TestUnderReplicatedBlockQueues { + private BlockInfo genBlockInfo(long id) { + return new BlockInfoContiguous(new Block(id), (short) 3); + } + /** * Test that adding blocks with different replication counts puts them * into different queues @@ -36,11 +40,11 @@ public class TestUnderReplicatedBlockQueues { @Test public void testBlockPriorities() throws Throwable { UnderReplicatedBlocks queues = new UnderReplicatedBlocks(); - Block block1 = new Block(1); - Block block2 = new Block(2); - Block block_very_under_replicated = new Block(3); - Block block_corrupt = new Block(4); - Block block_corrupt_repl_one = new Block(5); + BlockInfo block1 = genBlockInfo(1); + BlockInfo block2 = genBlockInfo(2); + BlockInfo block_very_under_replicated = genBlockInfo(3); + BlockInfo block_corrupt = genBlockInfo(4); + BlockInfo block_corrupt_repl_one = genBlockInfo(5); //add a block with a single entry assertAdded(queues, block1, 1, 0, 3); @@ -82,7 +86,7 @@ public class TestUnderReplicatedBlockQueues { } private void assertAdded(UnderReplicatedBlocks queues, - Block block, + BlockInfo block, int curReplicas, int decomissionedReplicas, int expectedReplicas) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java index 8f99afba1f7..90eb7d1a1f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestReadOnlySharedStorage.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -81,6 +82,7 @@ public class TestReadOnlySharedStorage { private DatanodeInfo readOnlyDataNode; private Block block; + private BlockInfo storedBlock; private ExtendedBlock extendedBlock; @@ -132,7 +134,8 @@ public class TestReadOnlySharedStorage { LocatedBlock locatedBlock = getLocatedBlock(); extendedBlock = locatedBlock.getBlock(); block = extendedBlock.getLocalBlock(); - + storedBlock = blockManager.getStoredBlock(block); + assertThat(locatedBlock.getLocations().length, is(1)); normalDataNode = locatedBlock.getLocations()[0]; readOnlyDataNode = datanodeManager.getDatanode(cluster.getDataNodes().get(RO_NODE_INDEX).getDatanodeId()); @@ -188,7 +191,7 @@ public class TestReadOnlySharedStorage { } private void validateNumberReplicas(int expectedReplicas) throws IOException { - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(expectedReplicas)); assertThat(numberReplicas.excessReplicas(), is(0)); assertThat(numberReplicas.corruptReplicas(), is(0)); @@ -230,7 +233,7 @@ public class TestReadOnlySharedStorage { cluster.getNameNode(), normalDataNode.getXferAddr()); // The live replica count should now be zero (since the NORMAL replica is offline) - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.liveReplicas(), is(0)); // The block should be reported as under-replicated @@ -263,7 +266,7 @@ public class TestReadOnlySharedStorage { waitForLocations(1); // However, the corrupt READ_ONLY_SHARED replica should *not* affect the overall corrupt replicas count - NumberReplicas numberReplicas = blockManager.countNodes(block); + NumberReplicas numberReplicas = blockManager.countNodes(storedBlock); assertThat(numberReplicas.corruptReplicas(), is(0)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java index 37abc5b726d..228a6672c37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.junit.Test; @@ -260,7 +261,9 @@ public class TestProcessCorruptBlocks { } private static NumberReplicas countReplicas(final FSNamesystem namesystem, ExtendedBlock block) { - return namesystem.getBlockManager().countNodes(block.getLocalBlock()); + final BlockManager blockManager = namesystem.getBlockManager(); + return blockManager.countNodes(blockManager.getStoredBlock( + block.getLocalBlock())); } private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path fileName,