diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 40f91f9f969..59623850f56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -689,9 +689,6 @@ Release 2.8.0 - UNRELEASED HDFS-8651. Make hadoop-hdfs-project Native code -Wall-clean (Alan Burlison via Colin P. McCabe) - HDFS-8623. Refactor NameNode handling of invalid, corrupt, and under-recovery - blocks. (Zhe Zhang via jing9) - HDFS-8653. Code cleanup for DatanodeManager, DatanodeDescriptor and DatanodeStorageInfo. (Zhe Zhang via wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java index 5ad992b5c72..4cc2791e754 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java @@ -172,23 +172,19 @@ public abstract class BlockInfo extends Block public abstract int numNodes(); /** - * Add a {@link DatanodeStorageInfo} location for a block - * @param storage The storage to add - * @param reportedBlock The block reported from the datanode. This is only - * used by erasure coded blocks, this block's id contains - * information indicating the index of the block in the - * corresponding block group. + * Add a {@link DatanodeStorageInfo} location for a block. */ - abstract boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock); + abstract boolean addStorage(DatanodeStorageInfo storage); /** * Remove {@link DatanodeStorageInfo} location for a block */ abstract boolean removeStorage(DatanodeStorageInfo storage); + /** * Replace the current BlockInfo with the new one in corresponding - * DatanodeStorageInfo's linked list. + * DatanodeStorageInfo's linked list */ abstract void replaceBlock(BlockInfo newBlock); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java index de64ad84b86..b9abcd03f29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguous.java @@ -45,7 +45,7 @@ public class BlockInfoContiguous extends BlockInfo { } @Override - boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + boolean addStorage(DatanodeStorageInfo storage) { return ContiguousBlockStorageOp.addStorage(this, storage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java index d3cb337b121..c66675a29a4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstructionContiguous.java @@ -69,7 +69,7 @@ public class BlockInfoUnderConstructionContiguous extends } @Override - boolean addStorage(DatanodeStorageInfo storage, Block reportedBlock) { + boolean addStorage(DatanodeStorageInfo storage) { return ContiguousBlockStorageOp.addStorage(this, storage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3ffd1bf2659..1597f419ff9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -203,8 +203,8 @@ public class BlockManager implements BlockStatsMXBean { * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map> excessReplicateMap = - new TreeMap<>(); + public final Map> excessReplicateMap = + new TreeMap>(); /** * Store set of Blocks that need to be replicated 1 or more times. @@ -508,8 +508,8 @@ public class BlockManager implements BlockStatsMXBean { /** Dump meta data to out. */ public void metaSave(PrintWriter out) { assert namesystem.hasWriteLock(); - final List live = new ArrayList<>(); - final List dead = new ArrayList<>(); + final List live = new ArrayList(); + final List dead = new ArrayList(); datanodeManager.fetchDatanodes(live, dead, false); out.println("Live Datanodes: " + live.size()); out.println("Dead Datanodes: " + dead.size()); @@ -548,8 +548,8 @@ public class BlockManager implements BlockStatsMXBean { List containingNodes = new ArrayList(); List containingLiveReplicasNodes = - new ArrayList<>(); - + new ArrayList(); + NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, @@ -578,7 +578,7 @@ public class BlockManager implements BlockStatsMXBean { Collection corruptNodes = corruptReplicas.getNodes(block); - for (DatanodeStorageInfo storage : getStorages(block)) { + for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); String state = ""; if (corruptNodes != null && corruptNodes.contains(node)) { @@ -601,23 +601,11 @@ public class BlockManager implements BlockStatsMXBean { return maxReplicationStreams; } - public int getDefaultStorageNum(BlockInfo block) { - return defaultReplication; - } - - public short getMinStorageNum(BlockInfo block) { - return minReplication; - } - /** - * @return true if the block has minimum stored copies + * @return true if the block has minimum replicas */ - public boolean hasMinStorage(BlockInfo block) { - return hasMinStorage(block, countNodes(block).liveReplicas()); - } - - public boolean hasMinStorage(BlockInfo block, int liveNum) { - return liveNum >= getMinStorageNum(block); + public boolean checkMinReplication(BlockInfo block) { + return (countNodes(block).liveReplicas() >= minReplication); } /** @@ -632,9 +620,8 @@ public class BlockManager implements BlockStatsMXBean { private static boolean commitBlock( final BlockInfoUnderConstruction block, final Block commitBlock) throws IOException { - if (block.getBlockUCState() == BlockUCState.COMMITTED) { + if (block.getBlockUCState() == BlockUCState.COMMITTED) return false; - } assert block.getNumBytes() <= commitBlock.getNumBytes() : "commitBlock length is less than the stored one " + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); @@ -654,22 +641,18 @@ public class BlockManager implements BlockStatsMXBean { */ public boolean commitOrCompleteLastBlock(BlockCollection bc, Block commitBlock) throws IOException { - if (commitBlock == null) { + if(commitBlock == null) return false; // not committing, this is a block allocation retry - } BlockInfo lastBlock = bc.getLastBlock(); - if (lastBlock == null) { + if(lastBlock == null) return false; // no blocks in file yet - } - if (lastBlock.isComplete()) { + if(lastBlock.isComplete()) return false; // already completed (e.g. by syncBlock) - } - + final boolean b = commitBlock( (BlockInfoUnderConstruction) lastBlock, commitBlock); - if(hasMinStorage(lastBlock)) { + if(countNodes(lastBlock).liveReplicas() >= minReplication) completeBlock(bc, bc.numBlocks()-1, false); - } return b; } @@ -682,24 +665,20 @@ public class BlockManager implements BlockStatsMXBean { */ private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { - if (blkIndex < 0) { + if(blkIndex < 0) return null; - } BlockInfo curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) { + if(curBlock.isComplete()) return curBlock; - } BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction) curBlock; int numNodes = ucBlock.numNodes(); - if (!force && !hasMinStorage(curBlock, numNodes)) { + if (!force && numNodes < minReplication) throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); - } - if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) { + if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); - } BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@ -784,7 +763,7 @@ public class BlockManager implements BlockStatsMXBean { // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - hasMinStorage(oldBlock, targets.length) ? -1 : 0, + targets.length >= minReplication ? -1 : 0, // always decrement total blocks -1); @@ -798,8 +777,8 @@ public class BlockManager implements BlockStatsMXBean { */ private List getValidLocations(Block block) { final List locations - = new ArrayList<>(blocksMap.numNodes(block)); - for(DatanodeStorageInfo storage : getStorages(block)) { + = new ArrayList(blocksMap.numNodes(block)); + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // filter invalidate replicas if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { locations.add(storage); @@ -812,7 +791,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockInfo[] blocks, final long offset, final long length, final int nrBlocksToReturn, final AccessMode mode) throws IOException { - int curBlk; + int curBlk = 0; long curPos = 0, blkSize = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -825,10 +804,10 @@ public class BlockManager implements BlockStatsMXBean { } if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file - return Collections.emptyList(); + return Collections.emptyList(); long endOff = offset + length; - List results = new ArrayList<>(blocks.length); + List results = new ArrayList(blocks.length); do { results.add(createLocatedBlock(blocks[curBlk], curPos, mode)); curPos += blocks[curBlk].getNumBytes(); @@ -841,7 +820,7 @@ public class BlockManager implements BlockStatsMXBean { private LocatedBlock createLocatedBlock(final BlockInfo[] blocks, final long endPos, final AccessMode mode) throws IOException { - int curBlk; + int curBlk = 0; long curPos = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -865,8 +844,8 @@ public class BlockManager implements BlockStatsMXBean { } /** @return a LocatedBlock for the given block */ - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) - throws IOException { + private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos + ) throws IOException { if (blk instanceof BlockInfoUnderConstruction) { if (blk.isComplete()) { throw new IOException( @@ -876,8 +855,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction) blk; final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = - new ExtendedBlock(namesystem.getBlockPoolId(), blk); + final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); return newLocatedBlock(eb, storages, pos, false); } @@ -897,12 +875,11 @@ public class BlockManager implements BlockStatsMXBean { final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; int j = 0; if (numMachines > 0) { - for(DatanodeStorageInfo storage : getStorages(blk)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); - if (isCorrupt || (!replicaCorrupt)) { + if (isCorrupt || (!replicaCorrupt)) machines[j++] = storage; - } } } assert j == machines.length : @@ -1076,7 +1053,7 @@ public class BlockManager implements BlockStatsMXBean { for(int i=0; i results = new ArrayList<>(); + List results = new ArrayList(); long totalSize = 0; BlockInfo curBlock; while(totalSize it = node.getBlockIterator(); + final Iterator it = node.getBlockIterator(); while(it.hasNext()) { removeStoredBlock(it.next(), node); } @@ -1114,10 +1091,10 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { assert namesystem.hasWriteLock(); - final Iterator it = storageInfo.getBlockIterator(); + final Iterator it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { - BlockInfo block = it.next(); + Block block = it.next(); removeStoredBlock(block, node); invalidateBlocks.remove(node, block); } @@ -1139,19 +1116,18 @@ public class BlockManager implements BlockStatsMXBean { * Adds block to list of blocks which will be invalidated on all its * datanodes. */ - private void addToInvalidates(BlockInfo storedBlock) { + private void addToInvalidates(Block b) { if (!namesystem.isPopulatingReplQueues()) { return; } StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, - State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - invalidateBlocks.add(storedBlock, node, false); + invalidateBlocks.add(b, node, false); datanodes.append(node).append(" "); } if (datanodes.length() != 0) { - blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock, + blockLog.debug("BLOCK* addToInvalidates: {} {}", b, datanodes.toString()); } } @@ -1179,8 +1155,7 @@ public class BlockManager implements BlockStatsMXBean { public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); - final Block reportedBlock = blk.getLocalBlock(); - final BlockInfo storedBlock = getStoredBlock(reportedBlock); + final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); if (storedBlock == null) { // Check if the replica is in the blockMap, if not // ignore the request for now. This could happen when BlockScanner @@ -1196,8 +1171,8 @@ public class BlockManager implements BlockStatsMXBean { + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } - - markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock, + + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), storageID == null ? null : node.getStorageInfo(storageID), node); @@ -1213,18 +1188,18 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.stored.isDeleted()) { + if (b.corrupted.isDeleted()) { blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; } short expectedReplicas = - getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored); + b.corrupted.getBlockCollection().getPreferredBlockReplication(); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored, b.corrupted); + storageInfo.addBlock(b.stored); } // Add this replica to corruptReplicas Map @@ -1234,8 +1209,8 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = hasMinStorage(b.stored, - numberOfReplicas.liveReplicas()); + boolean minReplicationSatisfied = + numberOfReplicas.liveReplicas() >= minReplication; boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@ -1378,7 +1353,7 @@ public class BlockManager implements BlockStatsMXBean { int additionalReplRequired; int scheduledWork = 0; - List work = new LinkedList<>(); + List work = new LinkedList(); namesystem.writeLock(); try { @@ -1395,11 +1370,11 @@ public class BlockManager implements BlockStatsMXBean { continue; } - requiredReplication = getExpectedReplicaNum(bc, block); + requiredReplication = bc.getPreferredBlockReplication(); // get a source data-node - containingNodes = new ArrayList<>(); - List liveReplicaNodes = new ArrayList<>(); + containingNodes = new ArrayList(); + List liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( block, containingNodes, liveReplicaNodes, numReplicas, @@ -1419,7 +1394,7 @@ public class BlockManager implements BlockStatsMXBean { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block, requiredReplication)) ) { + (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1443,7 +1418,7 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeUnlock(); } - final Set excludedNodes = new HashSet<>(); + final Set excludedNodes = new HashSet(); for(ReplicationWork rw : work){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. @@ -1479,7 +1454,7 @@ public class BlockManager implements BlockStatsMXBean { rw.targets = null; continue; } - requiredReplication = getExpectedReplicaNum(bc, block); + requiredReplication = bc.getPreferredBlockReplication(); // do not schedule more if enough replicas is already pending NumberReplicas numReplicas = countNodes(block); @@ -1488,7 +1463,7 @@ public class BlockManager implements BlockStatsMXBean { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block, requiredReplication)) ) { + (blockHasEnoughRacks(block)) ) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; blockLog.debug("BLOCK* Removing {} from neededReplications as" + @@ -1498,7 +1473,7 @@ public class BlockManager implements BlockStatsMXBean { } if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block, requiredReplication)) ) { + (!blockHasEnoughRacks(block)) ) { if (rw.srcNode.getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case @@ -1613,7 +1588,7 @@ public class BlockManager implements BlockStatsMXBean { List getDatanodeDescriptors(List nodes) { List datanodeDescriptors = null; if (nodes != null) { - datanodeDescriptors = new ArrayList<>(nodes.size()); + datanodeDescriptors = new ArrayList(nodes.size()); for (int i = 0; i < nodes.size(); i++) { DatanodeDescriptor node = datanodeManager.getDatanodeDescriptor(nodes.get(i)); if (node != null) { @@ -1669,9 +1644,9 @@ public class BlockManager implements BlockStatsMXBean { int excess = 0; Collection nodesCorrupt = corruptReplicas.getNodes(block); - for(DatanodeStorageInfo storage : getStorages(block)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - LightWeightLinkedSet excessBlocks = + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) @@ -1739,7 +1714,7 @@ public class BlockManager implements BlockStatsMXBean { * Use the blockinfo from the blocksmap to be certain we're working * with the most up-to-date block information (e.g. genstamp). */ - BlockInfo bi = getStoredBlock(timedOutItems[i]); + BlockInfo bi = blocksMap.getStoredBlock(timedOutItems[i]); if (bi == null) { continue; } @@ -1789,7 +1764,7 @@ public class BlockManager implements BlockStatsMXBean { final BlockInfoUnderConstruction storedBlock; final Block reportedBlock; final ReplicaState reportedState; - + StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, Block reportedBlock, ReplicaState reportedState) { this.storedBlock = storedBlock; @@ -1797,34 +1772,14 @@ public class BlockManager implements BlockStatsMXBean { this.reportedState = reportedState; } } - - private static class BlockInfoToAdd { - private final BlockInfo stored; - private final Block reported; - - BlockInfoToAdd(BlockInfo stored, Block reported) { - this.stored = stored; - this.reported = reported; - } - - public BlockInfo getStored() { - return stored; - } - - public Block getReported() { - return reported; - } - } - + /** * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a * list of blocks that should be considered corrupt due to a block report. */ private static class BlockToMarkCorrupt { - /** The corrupted block in a datanode. This is the one reported by the - * datanode. - */ - final Block corrupted; + /** The corrupted block in a datanode. */ + final BlockInfo corrupted; /** The corresponding block stored in the BlockManager. */ final BlockInfo stored; /** The reason to mark corrupt. */ @@ -1832,7 +1787,7 @@ public class BlockManager implements BlockStatsMXBean { /** The reason code to be stored */ final Reason reasonCode; - BlockToMarkCorrupt(Block corrupted, + BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, Reason reasonCode) { Preconditions.checkNotNull(corrupted, "corrupted is null"); @@ -1844,9 +1799,15 @@ public class BlockManager implements BlockStatsMXBean { this.reasonCode = reasonCode; } - BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, - String reason, Reason reasonCode) { - this(corrupted, stored, reason, reasonCode); + BlockToMarkCorrupt(BlockInfo stored, String reason, + Reason reasonCode) { + this(stored, stored, reason, reasonCode); + } + + BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, + Reason reasonCode) { + this(new BlockInfoContiguous(stored), stored, + reason, reasonCode); //the corrupted block in datanode has a different generation stamp corrupted.setGenerationStamp(gs); } @@ -2033,7 +1994,7 @@ public class BlockManager implements BlockStatsMXBean { break; } - BlockInfo bi = getStoredBlock(b); + BlockInfo bi = blocksMap.getStoredBlock(b); if (bi == null) { if (LOG.isDebugEnabled()) { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + @@ -2065,7 +2026,7 @@ public class BlockManager implements BlockStatsMXBean { endPostponedMisReplicatedBlocksCount) + " blocks are removed."); } } - + private Collection processReport( final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { @@ -2073,26 +2034,25 @@ public class BlockManager implements BlockStatsMXBean { // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // - Collection toAdd = new LinkedList<>(); - Collection toRemove = new TreeSet<>(); - Collection toInvalidate = new LinkedList<>(); - Collection toCorrupt = new LinkedList<>(); - Collection toUC = new LinkedList<>(); + Collection toAdd = new LinkedList(); + Collection toRemove = new TreeSet(); + Collection toInvalidate = new LinkedList(); + Collection toCorrupt = new LinkedList(); + Collection toUC = new LinkedList(); reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue - for (StatefulBlockInfo b : toUC) { + for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } - for (BlockInfo b : toRemove) { + for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; - for (BlockInfoToAdd b : toAdd) { - addStoredBlock(b.getStored(), b.getReported(), storageInfo, null, - numBlocksLogged < maxNumBlocksToLog); + for (BlockInfo b : toAdd) { + addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2113,17 +2073,17 @@ public class BlockManager implements BlockStatsMXBean { * Mark block replicas as corrupt except those on the storages in * newStorages list. */ - public void markBlockReplicasAsCorrupt(Block oldBlock, BlockInfo block, - long oldGenerationStamp, long oldNumBytes, + public void markBlockReplicasAsCorrupt(BlockInfo block, + long oldGenerationStamp, long oldNumBytes, DatanodeStorageInfo[] newStorages) throws IOException { assert namesystem.hasWriteLock(); BlockToMarkCorrupt b = null; if (block.getGenerationStamp() != oldGenerationStamp) { - b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp, + b = new BlockToMarkCorrupt(block, oldGenerationStamp, "genstamp does not match " + oldGenerationStamp + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (block.getNumBytes() != oldNumBytes) { - b = new BlockToMarkCorrupt(oldBlock, block, + b = new BlockToMarkCorrupt(block, "length does not match " + oldNumBytes + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH); } else { @@ -2182,7 +2142,7 @@ public class BlockManager implements BlockStatsMXBean { continue; } - BlockInfo storedBlock = getStoredBlock(iblk); + BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -2220,26 +2180,24 @@ public class BlockManager implements BlockStatsMXBean { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, iblk, storageInfo); + addStoredBlockImmediate(storedBlock, storageInfo); } } } - private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, - Collection toAdd, // add to DatanodeDescriptor - Collection toRemove, // remove from DatanodeDescriptor + private void reportDiff(DatanodeStorageInfo storageInfo, + BlockListAsLongs newReport, + Collection toAdd, // add to DatanodeDescriptor + Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list - // place a delimiter in the list which separates blocks + // place a delimiter in the list which separates blocks // that have been reported from those that have not - Block delimiterBlock = new Block(); - BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, - (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); - assert result == AddBlockResult.ADDED + BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter); + assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list int curIndex; @@ -2256,8 +2214,7 @@ public class BlockManager implements BlockStatsMXBean { // move block to the head of the list if (storedBlock != null && (curIndex = storedBlock.findStorageInfo(storageInfo)) >= 0) { - headIndex = - storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); + headIndex = storageInfo.moveBlockToHead(storedBlock, curIndex, headIndex); } } @@ -2265,9 +2222,8 @@ public class BlockManager implements BlockStatsMXBean { // all of them are next to the delimiter Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); - while (it.hasNext()) { + while(it.hasNext()) toRemove.add(it.next()); - } storageInfo.removeBlock(delimiter); } @@ -2304,12 +2260,12 @@ public class BlockManager implements BlockStatsMXBean { */ private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, - final Block block, final ReplicaState reportedState, - final Collection toAdd, - final Collection toInvalidate, + final Block block, final ReplicaState reportedState, + final Collection toAdd, + final Collection toInvalidate, final Collection toCorrupt, final Collection toUC) { - + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); if(LOG.isDebugEnabled()) { @@ -2317,16 +2273,16 @@ public class BlockManager implements BlockStatsMXBean { + " on " + dn + " size " + block.getNumBytes() + " replicaState = " + reportedState); } - + if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } - + // find block by blockId - BlockInfo storedBlock = getStoredBlock(block); + BlockInfo storedBlock = blocksMap.getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, // the replica should be removed from the data-node. @@ -2334,7 +2290,7 @@ public class BlockManager implements BlockStatsMXBean { return null; } BlockUCState ucState = storedBlock.getBlockUCState(); - + // Block is on the NN if(LOG.isDebugEnabled()) { LOG.debug("In memory blockUCState = " + ucState); @@ -2379,8 +2335,8 @@ public class BlockManager implements BlockStatsMXBean { // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED && (storedBlock.findStorageInfo(storageInfo) == -1 || - corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(new BlockInfoToAdd(storedBlock, block)); + corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { + toAdd.add(storedBlock); } return storedBlock; } @@ -2426,7 +2382,7 @@ public class BlockManager implements BlockStatsMXBean { if (rbi.getReportedState() == null) { // This is a DELETE_BLOCK request DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); - removeStoredBlock(getStoredBlock(rbi.getBlock()), + removeStoredBlock(rbi.getBlock(), storageInfo.getDatanodeDescriptor()); } else { processAndHandleReportedBlock(rbi.getStorageInfo(), @@ -2474,15 +2430,15 @@ public class BlockManager implements BlockStatsMXBean { case COMMITTED: if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS, + return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS - + " does not match genstamp in block map " - + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { - return new BlockToMarkCorrupt(new Block(reported), storedBlock, + return new BlockToMarkCorrupt(storedBlock, "block is " + ucState + " and reported length " + - reported.getNumBytes() + " does not match " + - "length in block map " + storedBlock.getNumBytes(), + reported.getNumBytes() + " does not match " + + "length in block map " + storedBlock.getNumBytes(), Reason.SIZE_MISMATCH); } else { return null; // not corrupt @@ -2490,12 +2446,11 @@ public class BlockManager implements BlockStatsMXBean { case UNDER_CONSTRUCTION: if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(new Block(reported), storedBlock, - reportedGS, "block is " + ucState + " and reported state " - + reportedState + ", But reported genstamp " + reportedGS + return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + + ucState + " and reported state " + reportedState + + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " - + storedBlock.getGenerationStamp(), - Reason.GENSTAMP_MISMATCH); + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } return null; default: @@ -2505,15 +2460,12 @@ public class BlockManager implements BlockStatsMXBean { case RWR: if (!storedBlock.isComplete()) { return null; // not corrupt - } else if (storedBlock.getGenerationStamp() != - reported.getGenerationStamp()) { + } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt( - new Block(reported), storedBlock, reportedGS, - "reported " + reportedState + - " replica with genstamp " + reportedGS + - " does not match COMPLETE block's genstamp in block map " + - storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + return new BlockToMarkCorrupt(storedBlock, reportedGS, + "reported " + reportedState + " replica with genstamp " + reportedGS + + " does not match COMPLETE block's genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else { // COMPLETE block, same genstamp if (reportedState == ReplicaState.RBW) { // If it's a RBW report for a COMPLETE block, it may just be that @@ -2525,7 +2477,7 @@ public class BlockManager implements BlockStatsMXBean { "complete with the same genstamp"); return null; } else { - return new BlockToMarkCorrupt(new Block(reported), storedBlock, + return new BlockToMarkCorrupt(storedBlock, "reported replica has invalid state " + reportedState, Reason.INVALID_STATE); } @@ -2538,8 +2490,7 @@ public class BlockManager implements BlockStatsMXBean { " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg, - Reason.INVALID_STATE); + return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); } } @@ -2572,7 +2523,7 @@ public class BlockManager implements BlockStatsMXBean { if (ucBlock.reportedState == ReplicaState.FINALIZED && (block.findStorageInfo(storageInfo) < 0)) { - addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true); + addStoredBlock(block, storageInfo, null, true); } } @@ -2587,23 +2538,23 @@ public class BlockManager implements BlockStatsMXBean { * * @throws IOException */ - private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, + private void addStoredBlockImmediate(BlockInfo storedBlock, DatanodeStorageInfo storageInfo) - throws IOException { + throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); - if (!namesystem.isInStartupSafeMode() + if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, reported, storageInfo, null, false); + addStoredBlock(storedBlock, storageInfo, null, false); return; } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock, reported); + AddBlockResult result = storageInfo.addBlock(storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && hasMinStorage(storedBlock, numCurrentReplica)) { + && numCurrentReplica >= minReplication) { completeBlock(storedBlock.getBlockCollection(), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2617,20 +2568,19 @@ public class BlockManager implements BlockStatsMXBean { /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. - * @return the block that is stored in blocksMap. + * @return the block that is stored in blockMap. */ private Block addStoredBlock(final BlockInfo block, - final Block reportedBlock, - DatanodeStorageInfo storageInfo, - DatanodeDescriptor delNodeHint, - boolean logEveryBlock) - throws IOException { + DatanodeStorageInfo storageInfo, + DatanodeDescriptor delNodeHint, + boolean logEveryBlock) + throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread - storedBlock = getStoredBlock(block); + storedBlock = blocksMap.getStoredBlock(block); } else { storedBlock = block; } @@ -2644,9 +2594,10 @@ public class BlockManager implements BlockStatsMXBean { return block; } BlockCollection bc = storedBlock.getBlockCollection(); + assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@ -2674,10 +2625,10 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas num = countNodes(storedBlock); int numLiveReplicas = num.liveReplicas(); int numCurrentReplica = numLiveReplicas - + pendingReplications.getNumReplicas(storedBlock); + + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - hasMinStorage(storedBlock, numLiveReplicas)) { + numLiveReplicas >= minReplication) { storedBlock = completeBlock(bc, storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2687,7 +2638,7 @@ public class BlockManager implements BlockStatsMXBean { // handles the safe block count maintenance. namesystem.incrementSafeBlockCount(numCurrentReplica); } - + // if file is under construction, then done for now if (bc.isUnderConstruction()) { return storedBlock; @@ -2699,7 +2650,7 @@ public class BlockManager implements BlockStatsMXBean { } // handle underReplication/overReplication - short fileReplication = getExpectedReplicaNum(bc, storedBlock); + short fileReplication = bc.getPreferredBlockReplication(); if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedAndDecommissioning(), fileReplication); @@ -2715,12 +2666,11 @@ public class BlockManager implements BlockStatsMXBean { int numCorruptNodes = num.corruptReplicas(); if (numCorruptNodes != corruptReplicasCount) { LOG.warn("Inconsistent number of corrupt replicas for " + - storedBlock + ". blockMap has " + numCorruptNodes + + storedBlock + "blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { - invalidateCorruptReplicas(storedBlock, reportedBlock); - } + if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) + invalidateCorruptReplicas(storedBlock); return storedBlock; } @@ -2752,7 +2702,7 @@ public class BlockManager implements BlockStatsMXBean { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfo blk, Block reported) { + private void invalidateCorruptReplicas(BlockInfo blk) { Collection nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) @@ -2762,8 +2712,8 @@ public class BlockManager implements BlockStatsMXBean { DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, - Reason.ANY), node)) { + if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, + Reason.ANY), node)) { removedFromBlocksMap = false; } } catch (IOException e) { @@ -2931,7 +2881,7 @@ public class BlockManager implements BlockStatsMXBean { } // calculate current replication short expectedReplication = - getExpectedReplicaNum(block.getBlockCollection(), block); + block.getBlockCollection().getPreferredBlockReplication(); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be @@ -2990,14 +2940,14 @@ public class BlockManager implements BlockStatsMXBean { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final BlockInfo block, + private void processOverReplicatedBlock(final Block block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); if (addedNode == delNodeHint) { delNodeHint = null; } - Collection nonExcess = new ArrayList<>(); + Collection nonExcess = new ArrayList(); Collection corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -3011,8 +2961,8 @@ public class BlockManager implements BlockStatsMXBean { postponeBlock(block); return; } - LightWeightLinkedSet excessBlocks = excessReplicateMap.get( - cur.getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(cur + .getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas @@ -3022,7 +2972,7 @@ public class BlockManager implements BlockStatsMXBean { } } } - chooseExcessReplicates(nonExcess, block, replication, + chooseExcessReplicates(nonExcess, block, replication, addedNode, delNodeHint, blockplacement); } @@ -3041,29 +2991,29 @@ public class BlockManager implements BlockStatsMXBean { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates( - final Collection nonExcess, - BlockInfo storedBlock, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { + private void chooseExcessReplicates(final Collection nonExcess, + Block b, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint, + BlockPlacementPolicy replicator) { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and - BlockCollection bc = getBlockCollection(storedBlock); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( - bc.getStoragePolicyID()); + BlockCollection bc = getBlockCollection(b); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); final List excessTypes = storagePolicy.chooseExcess( replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - final Map> rackMap = new HashMap<>(); - final List moreThanOne = new ArrayList<>(); - final List exactlyOne = new ArrayList<>(); + final Map> rackMap + = new HashMap>(); + final List moreThanOne = new ArrayList(); + final List exactlyOne = new ArrayList(); + // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica // exactlyOne contains the remaining nodes replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); - + // pick one node to delete that favors the delete hint // otherwise pick one with least space from priSet if it is not empty // otherwise one node with least space from remains @@ -3078,7 +3028,7 @@ public class BlockManager implements BlockStatsMXBean { moreThanOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication, + cur = replicator.chooseReplicaToDelete(bc, b, replication, moreThanOne, exactlyOne, excessTypes); } firstOne = false; @@ -3087,27 +3037,22 @@ public class BlockManager implements BlockStatsMXBean { replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); - processChosenExcessReplica(nonExcess, cur, storedBlock); - } - } + nonExcess.remove(cur); + addToExcessReplicate(cur.getDatanodeDescriptor(), b); - private void processChosenExcessReplica( - final Collection nonExcess, - final DatanodeStorageInfo chosen, BlockInfo storedBlock) { - nonExcess.remove(chosen); - addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the datanodes. - // - addToInvalidates(storedBlock, chosen.getDatanodeDescriptor()); - blockLog.debug("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", chosen, storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the namenode. + // + addToInvalidates(b, cur.getDatanodeDescriptor()); + blockLog.debug("BLOCK* chooseExcessReplicates: " + +"({}, {}) is added to invalidated blocks set", cur, b); + } } /** Check if we can use delHint */ @@ -3131,18 +3076,17 @@ public class BlockManager implements BlockStatsMXBean { } } - private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { + private void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet excessBlocks = excessReplicateMap.get( - dn.getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); if (excessBlocks == null) { - excessBlocks = new LightWeightLinkedSet<>(); + excessBlocks = new LightWeightLinkedSet(); excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } - if (excessBlocks.add(storedBlock)) { + if (excessBlocks.add(block)) { excessBlocksCount.incrementAndGet(); blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, storedBlock); + + " excessReplicateMap", dn, block); } } @@ -3154,26 +3098,26 @@ public class BlockManager implements BlockStatsMXBean { QUEUE_REASON_FUTURE_GENSTAMP); return; } - removeStoredBlock(getStoredBlock(block), node); + removeStoredBlock(block, node); } /** * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - public void removeStoredBlock(BlockInfo storedBlock, - DatanodeDescriptor node) { - blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); + public void removeStoredBlock(Block block, DatanodeDescriptor node) { + blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); assert (namesystem.hasWriteLock()); { + BlockInfo storedBlock = getStoredBlock(block); if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + - " removed from node {}", storedBlock, node); + " removed from node {}", block, node); return; } CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() - .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); + .get(new CachedBlock(block.getBlockId(), (short) 0, false)); if (cblock != null) { boolean removed = false; removed |= node.getPendingCached().remove(cblock); @@ -3181,7 +3125,7 @@ public class BlockManager implements BlockStatsMXBean { removed |= node.getPendingUncached().remove(cblock); if (removed) { blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " - + "related lists on node {}", storedBlock, node); + + "related lists on node {}", block, node); } } @@ -3191,7 +3135,7 @@ public class BlockManager implements BlockStatsMXBean { // necessary. In that case, put block on a possibly-will- // be-replicated list. // - BlockCollection bc = storedBlock.getBlockCollection(); + BlockCollection bc = blocksMap.getBlockCollection(block); if (bc != null) { namesystem.decrementSafeBlockCount(storedBlock); updateNeededReplications(storedBlock, -1, 0); @@ -3201,13 +3145,13 @@ public class BlockManager implements BlockStatsMXBean { // We've removed a block from a node, so it's definitely no longer // in "excess" there. // - LightWeightLinkedSet excessBlocks = excessReplicateMap.get( - node.getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node + .getDatanodeUuid()); if (excessBlocks != null) { - if (excessBlocks.remove(storedBlock)) { + if (excessBlocks.remove(block)) { excessBlocksCount.decrementAndGet(); blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + - "excessBlocks", storedBlock); + "excessBlocks", block); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getDatanodeUuid()); } @@ -3215,7 +3159,7 @@ public class BlockManager implements BlockStatsMXBean { } // Remove the replica from corruptReplicas - corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node); + corruptReplicas.removeFromCorruptReplicasMap(block, node); } } @@ -3223,7 +3167,7 @@ public class BlockManager implements BlockStatsMXBean { * Get all valid locations of the block & add the block to results * return the length of the added block; 0 if the block is not added */ - private long addBlock(BlockInfo block, List results) { + private long addBlock(Block block, List results) { final List locations = getValidLocations(block); if(locations.size() == 0) { return 0; @@ -3275,32 +3219,31 @@ public class BlockManager implements BlockStatsMXBean { processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - + private void processAndHandleReportedBlock( DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block - Collection toAdd = new LinkedList<>(); - Collection toInvalidate = new LinkedList<>(); - Collection toCorrupt = new LinkedList<>(); - Collection toUC = new LinkedList<>(); + Collection toAdd = new LinkedList(); + Collection toInvalidate = new LinkedList(); + Collection toCorrupt = new LinkedList(); + Collection toUC = new LinkedList(); final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, - toCorrupt, toUC); + processReportedBlock(storageInfo, block, reportedState, + toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it assert toUC.size() + toAdd.size() + toInvalidate.size() + toCorrupt.size() <= 1 - : "The block should be only in one of the lists."; + : "The block should be only in one of the lists."; - for (StatefulBlockInfo b : toUC) { + for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; - for (BlockInfoToAdd b : toAdd) { - addStoredBlock(b.getStored(), b.getReported(), storageInfo, delHintNode, - numBlocksLogged < maxNumBlocksToLog); + for (BlockInfo b : toAdd) { + addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -3365,7 +3308,7 @@ public class BlockManager implements BlockStatsMXBean { ReplicaState.RBW, null); break; default: - String msg = + String msg = "Unknown block status code reported by " + nodeID + ": " + rdbi; blockLog.warn(msg); @@ -3401,8 +3344,8 @@ public class BlockManager implements BlockStatsMXBean { } else if (node.isDecommissioned()) { decommissioned++; } else { - LightWeightLinkedSet blocksExcess = - excessReplicateMap.get(node.getDatanodeUuid()); + LightWeightLinkedSet blocksExcess = excessReplicateMap.get(node + .getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { @@ -3455,13 +3398,13 @@ public class BlockManager implements BlockStatsMXBean { int numOverReplicated = 0; while(it.hasNext()) { final BlockInfo block = it.next(); - int expectedReplication = this.getReplication(block); + BlockCollection bc = blocksMap.getBlockCollection(block); + short expectedReplication = bc.getPreferredBlockReplication(); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); if (numCurrentReplica > expectedReplication) { // over-replicated block - processOverReplicatedBlock(block, (short) expectedReplication, null, - null); + processOverReplicatedBlock(block, expectedReplication, null, null); numOverReplicated++; } } @@ -3487,7 +3430,7 @@ public class BlockManager implements BlockStatsMXBean { if (pendingReplicationBlocksCount == 0 && underReplicatedBlocksCount == 0) { LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", + " blocks or blocks pending replication. Safe to decommission.", node); return true; } @@ -3505,12 +3448,6 @@ public class BlockManager implements BlockStatsMXBean { return blocksMap.size(); } - - /** @return an iterator of the datanodes. */ - public Iterable getStorages(final Block block) { - return blocksMap.getStorages(block); - } - public DatanodeStorageInfo[] getStorages(BlockInfo block) { final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[block.numNodes()]; int i = 0; @@ -3596,13 +3533,13 @@ public class BlockManager implements BlockStatsMXBean { String src, BlockInfo[] blocks) { for (BlockInfo b: blocks) { if (!b.isComplete()) { + final BlockInfoUnderConstruction uc = + (BlockInfoUnderConstruction)b; final int numNodes = b.numNodes(); - final int min = getMinStorageNum(b); - final BlockUCState state = b.getBlockUCState(); - LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + state - + ", replication# = " + numNodes - + (numNodes < min ? " < " : " >= ") - + " minimum = " + min + ") in file " + src); + LOG.info("BLOCK* " + b + " is not COMPLETE (ucState = " + + uc.getBlockUCState() + ", replication# = " + numNodes + + (numNodes < minReplication ? " < ": " >= ") + + " minimum = " + minReplication + ") in file " + src); return false; } } @@ -3613,15 +3550,15 @@ public class BlockManager implements BlockStatsMXBean { * @return 0 if the block is not found; * otherwise, return the replication factor of the block. */ - private int getReplication(BlockInfo block) { + private int getReplication(Block block) { final BlockCollection bc = blocksMap.getBlockCollection(block); - return bc == null? 0: getExpectedReplicaNum(bc, block); + return bc == null? 0: bc.getPreferredBlockReplication(); } /** - * Get blocks to invalidate for nodeId. - * in {@link #invalidateBlocks}.boolean blockHasEnoughRacks + * Get blocks to invalidate for nodeId + * in {@link #invalidateBlocks}. * * @return number of blocks scheduled for removal during this iteration. */ @@ -3659,20 +3596,22 @@ public class BlockManager implements BlockStatsMXBean { return toInvalidate.size(); } - boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) { + boolean blockHasEnoughRacks(Block b) { if (!this.shouldCheckForEnoughRacks) { return true; } - boolean enoughRacks = false; - Collection corruptNodes = - corruptReplicas.getNodes(storedBlock); + boolean enoughRacks = false;; + Collection corruptNodes = + corruptReplicas.getNodes(b); + int numExpectedReplicas = getReplication(b); String rackName = null; - for(DatanodeStorageInfo storage : getStorages(storedBlock)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { - if (expectedStorageNum == 1 || (expectedStorageNum > 1 && - !datanodeManager.hasClusterEverBeenMultiRack())) { + if (numExpectedReplicas == 1 || + (numExpectedReplicas > 1 && + !datanodeManager.hasClusterEverBeenMultiRack())) { enoughRacks = true; break; } @@ -3693,13 +3632,8 @@ public class BlockManager implements BlockStatsMXBean { * A block needs replication if the number of replicas is less than expected * or if it does not have enough racks. */ - boolean isNeededReplication(BlockInfo storedBlock, int expected, - int current) { - return current < expected || !blockHasEnoughRacks(storedBlock, expected); - } - - public short getExpectedReplicaNum(BlockCollection bc, BlockInfo block) { - return bc.getPreferredBlockReplication(); + boolean isNeededReplication(Block b, int expected, int current) { + return current < expected || !blockHasEnoughRacks(b); } public long getMissingBlocksCount() { @@ -3721,6 +3655,11 @@ public class BlockManager implements BlockStatsMXBean { return blocksMap.getBlockCollection(b); } + /** @return an iterator of the datanodes. */ + public Iterable getStorages(final Block block) { + return blocksMap.getStorages(block); + } + public int numCorruptReplicas(Block block) { return corruptReplicas.numCorruptReplicas(block); } @@ -3736,10 +3675,9 @@ public class BlockManager implements BlockStatsMXBean { * If a block is removed from blocksMap, remove it from excessReplicateMap. */ private void removeFromExcessReplicateMap(Block block) { - for (DatanodeStorageInfo info : getStorages(block)) { + for (DatanodeStorageInfo info : blocksMap.getStorages(block)) { String uuid = info.getDatanodeDescriptor().getDatanodeUuid(); - LightWeightLinkedSet excessReplicas = - excessReplicateMap.get(uuid); + LightWeightLinkedSet excessReplicas = excessReplicateMap.get(uuid); if (excessReplicas != null) { if (excessReplicas.remove(block)) { excessBlocksCount.decrementAndGet(); @@ -3928,7 +3866,7 @@ public class BlockManager implements BlockStatsMXBean { /** * A simple result enum for the result of - * {@link BlockManager#processMisReplicatedBlock}. + * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. */ enum MisReplicationResult { /** The block should be invalidated since it belongs to a deleted file. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java index 92841a634cb..216d6d2bf14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java @@ -24,7 +24,6 @@ import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@ -234,7 +233,7 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - public AddBlockResult addBlock(BlockInfo b, Block reportedBlock) { + public AddBlockResult addBlock(BlockInfo b) { // First check whether the block belongs to a different storage // on the same DN. AddBlockResult result = AddBlockResult.ADDED; @@ -253,18 +252,10 @@ public class DatanodeStorageInfo { } // add to the head of the data-node list - b.addStorage(this, reportedBlock); - insertToList(b); - return result; - } - - AddBlockResult addBlock(BlockInfo b) { - return addBlock(b, b); - } - - public void insertToList(BlockInfo b) { + b.addStorage(this); blockList = b.listInsert(blockList, this); numBlocks++; + return result; } public boolean removeBlock(BlockInfo b) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index e3717abbe6c..3d176b052d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -142,6 +142,7 @@ import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; @@ -2790,7 +2791,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (trackBlockCounts) { if (b.isComplete()) { numRemovedComplete++; - if (blockManager.hasMinStorage(b)) { + if (blockManager.checkMinReplication(b)) { numRemovedSafe++; } } @@ -3022,7 +3023,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; - assert blockManager.hasMinStorage(curBlock) : + assert blockManager.checkMinReplication(curBlock) : "A COMPLETE block is not minimally replicated in " + src; } @@ -3058,7 +3059,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // If penultimate block doesn't exist then its minReplication is met boolean penultimateBlockMinReplication = penultimateBlock == null ? true : - blockManager.hasMinStorage(penultimateBlock); + blockManager.checkMinReplication(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3067,7 +3068,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, case COMMITTED: // Close file if committed blocks are minimally replicated if(penultimateBlockMinReplication && - blockManager.hasMinStorage(lastBlock)) { + blockManager.checkMinReplication(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" @@ -3359,9 +3360,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); if (storageInfo != null) { if(copyTruncate) { - storageInfo.addBlock(truncatedBlock, truncatedBlock); + storageInfo.addBlock(truncatedBlock); } else { - storageInfo.addBlock(storedBlock, storedBlock); + storageInfo.addBlock(storedBlock); } } } @@ -3377,9 +3378,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, } else { iFile.setLastBlock(storedBlock, trimmedStorageInfos); if (closeFile) { - blockManager.markBlockReplicasAsCorrupt(oldBlock.getLocalBlock(), - storedBlock, oldGenerationStamp, oldNumBytes, - trimmedStorageInfos); + blockManager.markBlockReplicasAsCorrupt(storedBlock, + oldGenerationStamp, oldNumBytes, trimmedStorageInfos); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index ab179b454c0..7d4cd7e5205 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -647,7 +647,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { .getStorageType())); } if (showReplicaDetails) { - LightWeightLinkedSet blocksExcess = + LightWeightLinkedSet blocksExcess = bm.excessReplicateMap.get(dnDesc.getDatanodeUuid()); Collection corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java index bae4f1d41bb..5126aa78dfb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java @@ -63,7 +63,7 @@ public class TestBlockInfo { final DatanodeStorageInfo storage = DFSTestUtil.createDatanodeStorageInfo("storageID", "127.0.0.1"); - boolean added = blockInfo.addStorage(storage, blockInfo); + boolean added = blockInfo.addStorage(storage); Assert.assertTrue(added); Assert.assertEquals(storage, blockInfo.getStorageInfo(0)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 9e316708541..396dff302a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -383,7 +383,7 @@ public class TestBlockManager { for (int i = 1; i < pipeline.length; i++) { DatanodeStorageInfo storage = pipeline[i]; bm.addBlock(storage, blockInfo, null); - blockInfo.addStorage(storage, blockInfo); + blockInfo.addStorage(storage); } } @@ -393,7 +393,7 @@ public class TestBlockManager { for (DatanodeDescriptor dn : nodes) { for (DatanodeStorageInfo storage : dn.getStorageInfos()) { - blockInfo.addStorage(storage, blockInfo); + blockInfo.addStorage(storage); } } return blockInfo; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index c33667d5e00..1c3f075d5f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -100,7 +100,7 @@ public class TestNodeCount { DatanodeDescriptor nonExcessDN = null; for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); - Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); + Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { nonExcessDN = dn; break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 83b3aa0f6a1..2d7bb440d0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -41,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.util.Time; import org.junit.Test; public class TestOverReplicatedBlocks { @@ -183,7 +185,7 @@ public class TestOverReplicatedBlocks { // All replicas for deletion should be scheduled on lastDN. // And should not actually be deleted, because lastDN does not heartbeat. namesystem.readLock(); - Collection dnBlocks = + Collection dnBlocks = namesystem.getBlockManager().excessReplicateMap.get(lastDNid); assertEquals("Replicas on node " + lastDNid + " should have been deleted", SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 44f0e65f38b..28129572370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -1250,7 +1250,7 @@ public class TestReplicationPolicy { when(storage.removeBlock(any(BlockInfo.class))).thenReturn(true); when(storage.addBlock(any(BlockInfo.class))).thenReturn (DatanodeStorageInfo.AddBlockResult.ADDED); - ucBlock.addStorage(storage, ucBlock); + ucBlock.addStorage(storage); when(mbc.setLastBlock((BlockInfo) any(), (DatanodeStorageInfo[]) any())) .thenReturn(ucBlock);