diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 30b5ee715f1..4a85efbcbcd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; + import java.io.DataOutput; import java.io.IOException; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE; + /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. * @@ -37,7 +39,6 @@ import java.io.IOException; * array to record the block index for each triplet. */ public class BlockInfoStriped extends BlockInfo { - private final int chunkSize = HdfsConstants.BLOCK_STRIPED_CHUNK_SIZE; private final short dataBlockNum; private final short parityBlockNum; /** @@ -132,6 +133,22 @@ public class BlockInfoStriped extends BlockInfo { return i == -1 ? -1 : indices[i]; } + /** + * Identify the block stored in the given datanode storage. Note that + * the returned block has the same block Id with the one seen/reported by the + * DataNode. + */ + Block getBlockOnStorage(DatanodeStorageInfo storage) { + int index = getStorageBlockIndex(storage); + if (index < 0) { + return null; + } else { + Block block = new Block(this); + block.setBlockId(this.getBlockId() + index); + return block; + } + } + @Override boolean removeStorage(DatanodeStorageInfo storage) { int dnIndex = findStorageInfoFromEnd(storage); @@ -186,8 +203,8 @@ public class BlockInfoStriped extends BlockInfo { // In case striped blocks, total usage by this striped blocks should // be the total of data blocks and parity blocks because // `getNumBytes` is the total of actual data block size. - return ((getNumBytes() - 1) / (dataBlockNum * chunkSize) + 1) - * chunkSize * parityBlockNum + getNumBytes(); + return ((getNumBytes() - 1) / (dataBlockNum * BLOCK_STRIPED_CHUNK_SIZE) + 1) + * BLOCK_STRIPED_CHUNK_SIZE * parityBlockNum + getNumBytes(); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 363e687a368..b943ba4221f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -179,7 +179,11 @@ public class BlockManager { /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); - /** Blocks to be invalidated. */ + /** + * Blocks to be invalidated. + * For a striped block to invalidate, we should track its individual internal + * blocks. + */ private final InvalidateBlocks invalidateBlocks; /** @@ -195,8 +199,8 @@ public class BlockManager { * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map> excessReplicateMap = - new TreeMap>(); + public final Map> excessReplicateMap = + new TreeMap<>(); /** * Store set of Blocks that need to be replicated 1 or more times. @@ -594,11 +598,11 @@ public class BlockManager { ((BlockInfoStriped) block).getDataBlockNum() : minReplication; } - public boolean checkMinStorage(BlockInfo block) { + public boolean hasMinStorage(BlockInfo block) { return countNodes(block).liveReplicas() >= getMinStorageNum(block); } - public boolean checkMinStorage(BlockInfo block, int liveNum) { + public boolean hasMinStorage(BlockInfo block, int liveNum) { return liveNum >= getMinStorageNum(block); } @@ -643,7 +647,7 @@ public class BlockManager { return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock(lastBlock, commitBlock); - if (checkMinStorage(lastBlock)) { + if (hasMinStorage(lastBlock)) { completeBlock(bc, bc.numBlocks() - 1, false); } return b; @@ -667,7 +671,7 @@ public class BlockManager { } int numNodes = curBlock.numNodes(); - if (!force && !checkMinStorage(curBlock, numNodes)) { + if (!force && !hasMinStorage(curBlock, numNodes)) { throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); } @@ -765,7 +769,7 @@ public class BlockManager { // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - checkMinStorage(oldBlock, targets.length) ? -1 : 0, + hasMinStorage(oldBlock, targets.length) ? -1 : 0, // always decrement total blocks -1); @@ -1099,7 +1103,7 @@ public class BlockManager { /** Remove the blocks associated to the given datanode. */ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { - final Iterator it = node.getBlockIterator(); + final Iterator it = node.getBlockIterator(); while(it.hasNext()) { removeStoredBlock(it.next(), node); } @@ -1113,10 +1117,10 @@ public class BlockManager { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { assert namesystem.hasWriteLock(); - final Iterator it = storageInfo.getBlockIterator(); + final Iterator it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { - Block block = it.next(); + BlockInfo block = it.next(); removeStoredBlock(block, node); invalidateBlocks.remove(node, block); } @@ -1138,21 +1142,32 @@ public class BlockManager { * Adds block to list of blocks which will be invalidated on all its * datanodes. */ - private void addToInvalidates(Block b) { + private void addToInvalidates(BlockInfo storedBlock) { if (!namesystem.isPopulatingReplQueues()) { return; } StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, + State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - invalidateBlocks.add(b, node, false); - datanodes.append(node).append(" "); + final Block b = getBlockToInvalidate(storedBlock, storage); + if (b != null) { + invalidateBlocks.add(b, node, false); + datanodes.append(node).append(" "); + } } if (datanodes.length() != 0) { - blockLog.info("BLOCK* addToInvalidates: {} {}", b, datanodes.toString()); + blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock, + datanodes.toString()); } } + private Block getBlockToInvalidate(BlockInfo storedBlock, + DatanodeStorageInfo storage) { + return storedBlock.isStriped() ? + ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock; + } + /** * Remove all block invalidation tasks under this datanode UUID; * used when a datanode registers with a new UUID and the old one @@ -1210,18 +1225,18 @@ public class BlockManager { DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.corrupted.isDeleted()) { + if (b.stored.isDeleted()) { blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; } short expectedReplicas = - b.corrupted.getBlockCollection().getPreferredBlockReplication(); + b.stored.getBlockCollection().getPreferredBlockReplication(); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored, b.reportedBlock); + storageInfo.addBlock(b.stored, b.corrupted); } // Add this replica to corruptReplicas Map @@ -1231,8 +1246,10 @@ public class BlockManager { NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = checkMinStorage(b.stored, + + boolean minReplicationSatisfied = hasMinStorage(b.stored, numberOfReplicas.liveReplicas()); + boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@ -1424,7 +1441,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications blockLog.info("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1507,7 +1524,7 @@ public class BlockManager { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; blockLog.info("BLOCK* Removing {} from neededReplications as" + @@ -1517,7 +1534,7 @@ public class BlockManager { } if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { + (!blockHasEnoughRacks(block, requiredReplication)) ) { if (rw.srcNodes[0].getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case @@ -1711,7 +1728,7 @@ public class BlockManager { getStorageBlockIndex(storage)); } final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - LightWeightLinkedSet excessBlocks = + LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) @@ -1847,39 +1864,32 @@ public class BlockManager { * list of blocks that should be considered corrupt due to a block report. */ private static class BlockToMarkCorrupt { - /** The corrupted block in a datanode. */ - final BlockInfo corrupted; + /** + * The corrupted block in a datanode. This is the one reported by the + * datanode. + */ + final Block corrupted; /** The corresponding block stored in the BlockManager. */ final BlockInfo stored; - /** The block reported from a datanode */ - final Block reportedBlock; /** The reason to mark corrupt. */ final String reason; /** The reason code to be stored */ final Reason reasonCode; - BlockToMarkCorrupt(Block reported, BlockInfo corrupted, - BlockInfo stored, String reason, Reason reasonCode) { - Preconditions.checkNotNull(reported, "reported is null"); + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason, + Reason reasonCode) { Preconditions.checkNotNull(corrupted, "corrupted is null"); Preconditions.checkNotNull(stored, "stored is null"); - this.reportedBlock = reported; this.corrupted = corrupted; this.stored = stored; this.reason = reason; this.reasonCode = reasonCode; } - BlockToMarkCorrupt(Block reported, BlockInfo stored, String reason, - Reason reasonCode) { - this(reported, stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(Block reported, BlockInfo stored, long gs, + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, String reason, Reason reasonCode) { - this(reported, BlockInfo.copyOf(stored), stored, reason, - reasonCode); + this(corrupted, stored, reason, reasonCode); //the corrupted block in datanode has a different generation stamp corrupted.setGenerationStamp(gs); } @@ -2098,10 +2108,10 @@ public class BlockManager { // between the old and new block report. // Collection toAdd = new LinkedList<>(); - Collection toRemove = new TreeSet(); - Collection toInvalidate = new LinkedList(); - Collection toCorrupt = new LinkedList(); - Collection toUC = new LinkedList(); + Collection toRemove = new TreeSet<>(); + Collection toInvalidate = new LinkedList<>(); + Collection toCorrupt = new LinkedList<>(); + Collection toUC = new LinkedList<>(); reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); @@ -2110,7 +2120,7 @@ public class BlockManager { for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } - for (Block b : toRemove) { + for (BlockInfo b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; @@ -2250,7 +2260,7 @@ public class BlockManager { private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection toAdd, // add to DatanodeDescriptor - Collection toRemove, // remove from DatanodeDescriptor + Collection toRemove, // remove from DatanodeDescriptor Collection toInvalidate, // should be removed from DN Collection toCorrupt, // add to corrupt replicas list Collection toUC) { // add to under-construction list @@ -2285,8 +2295,9 @@ public class BlockManager { // collect blocks that have not been reported // all of them are next to the delimiter Iterator it = storageInfo.new BlockIterator(delimiter.getNext(0)); - while(it.hasNext()) + while (it.hasNext()) { toRemove.add(it.next()); + } storageInfo.removeBlock(delimiter); } @@ -2617,7 +2628,7 @@ public class BlockManager { // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && checkMinStorage(storedBlock, numCurrentReplica)) { + && hasMinStorage(storedBlock, numCurrentReplica)) { completeBlock(storedBlock.getBlockCollection(), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2692,7 +2703,7 @@ public class BlockManager { + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - checkMinStorage(storedBlock, numLiveReplicas)) { + hasMinStorage(storedBlock, numLiveReplicas)) { storedBlock = completeBlock(bc, storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2730,7 +2741,7 @@ public class BlockManager { int numCorruptNodes = num.corruptReplicas(); if (numCorruptNodes != corruptReplicasCount) { LOG.warn("Inconsistent number of corrupt replicas for " + - storedBlock + "blockMap has " + numCorruptNodes + + storedBlock + ". blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { @@ -3004,14 +3015,14 @@ public class BlockManager { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final Block block, + private void processOverReplicatedBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); if (addedNode == delNodeHint) { delNodeHint = null; } - Collection nonExcess = new ArrayList(); + Collection nonExcess = new ArrayList<>(); Collection corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -3025,8 +3036,8 @@ public class BlockManager { postponeBlock(block); return; } - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(cur - .getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + cur.getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas @@ -3056,22 +3067,22 @@ public class BlockManager { * then pick a node with least free space */ private void chooseExcessReplicates(final Collection nonExcess, - Block b, short replication, + BlockInfo storedBlock, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint, BlockPlacementPolicy replicator) { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and - BlockCollection bc = getBlockCollection(b); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); + BlockCollection bc = getBlockCollection(storedBlock); + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); final List excessTypes = storagePolicy.chooseExcess( replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - final Map> rackMap - = new HashMap>(); - final List moreThanOne = new ArrayList(); - final List exactlyOne = new ArrayList(); + final Map> rackMap = new HashMap<>(); + final List moreThanOne = new ArrayList<>(); + final List exactlyOne = new ArrayList<>(); // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica @@ -3092,7 +3103,7 @@ public class BlockManager { moreThanOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, b, replication, + cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication, moreThanOne, exactlyOne, excessTypes); } firstOne = false; @@ -3102,7 +3113,7 @@ public class BlockManager { exactlyOne, cur); nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), b); + addToExcessReplicate(cur.getDatanodeDescriptor(), storedBlock); // // The 'excessblocks' tracks blocks until we get confirmation @@ -3111,11 +3122,12 @@ public class BlockManager { // // The 'invalidate' list is used to inform the datanode the block // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the namenode. + // upon giving instructions to the datanodes. // - addToInvalidates(b, cur.getDatanodeDescriptor()); + final Block blockToInvalidate = getBlockToInvalidate(storedBlock, cur); + addToInvalidates(blockToInvalidate, cur.getDatanodeDescriptor()); blockLog.info("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, b); + +"({}, {}) is added to invalidated blocks set", cur, storedBlock); } } @@ -3140,17 +3152,18 @@ public class BlockManager { } } - private void addToExcessReplicate(DatanodeInfo dn, Block block) { + private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + dn.getDatanodeUuid()); if (excessBlocks == null) { - excessBlocks = new LightWeightLinkedSet(); + excessBlocks = new LightWeightLinkedSet<>(); excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } - if (excessBlocks.add(block)) { + if (excessBlocks.add(storedBlock)) { excessBlocksCount.incrementAndGet(); blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, block); + + " excessReplicateMap", dn, storedBlock); } } @@ -3169,14 +3182,13 @@ public class BlockManager { * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - public void removeStoredBlock(Block block, DatanodeDescriptor node) { - blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); + public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { + blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); assert (namesystem.hasWriteLock()); { - BlockInfo storedBlock = getStoredBlock(block); if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + - " removed from node {}", block, node); + " removed from node {}", storedBlock, node); return; } @@ -3186,7 +3198,7 @@ public class BlockManager { // necessary. In that case, put block on a possibly-will- // be-replicated list. // - BlockCollection bc = blocksMap.getBlockCollection(block); + BlockCollection bc = storedBlock.getBlockCollection(); if (bc != null) { namesystem.decrementSafeBlockCount(storedBlock); updateNeededReplications(storedBlock, -1, 0); @@ -3196,13 +3208,13 @@ public class BlockManager { // We've removed a block from a node, so it's definitely no longer // in "excess" there. // - LightWeightLinkedSet excessBlocks = excessReplicateMap.get(node - .getDatanodeUuid()); + LightWeightLinkedSet excessBlocks = excessReplicateMap.get( + node.getDatanodeUuid()); if (excessBlocks != null) { - if (excessBlocks.remove(block)) { + if (excessBlocks.remove(storedBlock)) { excessBlocksCount.decrementAndGet(); blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + - "excessBlocks", block); + "excessBlocks", storedBlock); if (excessBlocks.size() == 0) { excessReplicateMap.remove(node.getDatanodeUuid()); } @@ -3210,7 +3222,7 @@ public class BlockManager { } // Remove the replica from corruptReplicas - corruptReplicas.removeFromCorruptReplicasMap(block, node); + corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node); } } @@ -3344,7 +3356,7 @@ public class BlockManager { for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { switch (rdbi.getStatus()) { case DELETED_BLOCK: - removeStoredBlock(storageInfo, rdbi.getBlock(), node); + removeStoredBlock(storageInfo, getStoredBlock(rdbi.getBlock()), node); deleted++; break; case RECEIVED_BLOCK: @@ -3395,8 +3407,8 @@ public class BlockManager { } else if (node.isDecommissioned()) { decommissioned++; } else { - LightWeightLinkedSet blocksExcess = excessReplicateMap.get(node - .getDatanodeUuid()); + LightWeightLinkedSet blocksExcess = excessReplicateMap.get( + node.getDatanodeUuid()); if (blocksExcess != null && blocksExcess.contains(b)) { excess++; } else { @@ -3449,13 +3461,13 @@ public class BlockManager { int numOverReplicated = 0; while(it.hasNext()) { final BlockInfo block = it.next(); - BlockCollection bc = blocksMap.getBlockCollection(block); - short expectedReplication = bc.getPreferredBlockReplication(); + int expectedReplication = this.getReplication(block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); if (numCurrentReplica > expectedReplication) { // over-replicated block - processOverReplicatedBlock(block, expectedReplication, null, null); + processOverReplicatedBlock(block, (short) expectedReplication, null, + null); numOverReplicated++; } } @@ -3655,21 +3667,20 @@ public class BlockManager { return toInvalidate.size(); } - boolean blockHasEnoughRacks(Block b) { + // TODO: update the enough rack logic for striped blocks + boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) { if (!this.shouldCheckForEnoughRacks) { return true; } boolean enoughRacks = false; - Collection corruptNodes = - corruptReplicas.getNodes(b); - int numExpectedReplicas = getReplication(b); + Collection corruptNodes = + corruptReplicas.getNodes(storedBlock); String rackName = null; - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) { final DatanodeDescriptor cur = storage.getDatanodeDescriptor(); if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if ((corruptNodes == null ) || !corruptNodes.contains(cur)) { - if (numExpectedReplicas == 1 || - (numExpectedReplicas > 1 && + if (expectedStorageNum == 1 || (expectedStorageNum > 1 && !datanodeManager.hasClusterEverBeenMultiRack())) { enoughRacks = true; break; @@ -3691,8 +3702,8 @@ public class BlockManager { * A block needs replication if the number of replicas is less than expected * or if it does not have enough racks. */ - boolean isNeededReplication(Block b, int expected, int current) { - return current < expected || !blockHasEnoughRacks(b); + boolean isNeededReplication(BlockInfo storedBlock, int expected, int current) { + return current < expected || !blockHasEnoughRacks(storedBlock, expected); } public long getMissingBlocksCount() { @@ -3876,8 +3887,7 @@ public class BlockManager { /** * This class is used internally by {@link this#computeRecoveryWorkForBlocks} * to represent a task to recover a block through replication or erasure - * coding. Recovery is done by transferring data from {@link srcNodes} to - * {@link targets} + * coding. Recovery is done by transferring data from srcNodes to targets */ private static class BlockRecoveryWork { protected final BlockInfo block; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index dc697f06436..37ce8e34bc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -100,7 +100,7 @@ public class DecommissionManager { * reports or other events. Before being finally marking as decommissioned, * another check is done with the actual block map. */ - private final TreeMap> + private final TreeMap> decomNodeBlocks; /** @@ -244,12 +244,12 @@ public class DecommissionManager { } /** - * Checks whether a block is sufficiently replicated for decommissioning. - * Full-strength replication is not always necessary, hence "sufficient". + * Checks whether a block is sufficiently replicated/stored for + * decommissioning. For replicated blocks or striped blocks, full-strength + * replication or storage is not always necessary, hence "sufficient". * @return true if sufficient, else false. */ - private boolean isSufficientlyReplicated(BlockInfoContiguous block, - BlockCollection bc, + private boolean isSufficient(BlockInfo block, BlockCollection bc, NumberReplicas numberReplicas) { final int numExpected = bc.getPreferredBlockReplication(); final int numLive = numberReplicas.liveReplicas(); @@ -265,18 +265,19 @@ public class DecommissionManager { if (numExpected > numLive) { if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) { // Can decom a UC block as long as there will still be minReplicas - if (numLive >= blockManager.minReplication) { + if (blockManager.hasMinStorage(block, numLive)) { LOG.trace("UC block {} sufficiently-replicated since numLive ({}) " - + ">= minR ({})", block, numLive, blockManager.minReplication); + + ">= minR ({})", block, numLive, + blockManager.getMinStorageNum(block)); return true; } else { LOG.trace("UC block {} insufficiently-replicated since numLive " + "({}) < minR ({})", block, numLive, - blockManager.minReplication); + blockManager.getMinStorageNum(block)); } } else { // Can decom a non-UC as long as the default replication is met - if (numLive >= blockManager.defaultReplication) { + if (numLive >= blockManager.getDefaultStorageNum(block)) { return true; } } @@ -412,7 +413,7 @@ public class DecommissionManager { } private void check() { - final Iterator>> + final Iterator>> it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator(); final LinkedList toRemove = new LinkedList<>(); @@ -420,10 +421,10 @@ public class DecommissionManager { && !exceededNumBlocksPerCheck() && !exceededNumNodesPerCheck()) { numNodesChecked++; - final Map.Entry> + final Map.Entry> entry = it.next(); final DatanodeDescriptor dn = entry.getKey(); - AbstractList blocks = entry.getValue(); + AbstractList blocks = entry.getValue(); boolean fullScan = false; if (blocks == null) { // This is a newly added datanode, run through its list to schedule @@ -431,14 +432,14 @@ public class DecommissionManager { // that are insufficiently replicated for further tracking LOG.debug("Newly-added node {}, doing full scan to find " + "insufficiently-replicated blocks.", dn); - blocks = handleInsufficientlyReplicated(dn); + blocks = handleInsufficientlyStored(dn); decomNodeBlocks.put(dn, blocks); fullScan = true; } else { // This is a known datanode, check if its # of insufficiently // replicated blocks has dropped to zero and if it can be decommed LOG.debug("Processing decommission-in-progress node {}", dn); - pruneSufficientlyReplicated(dn, blocks); + pruneReliableBlocks(dn, blocks); } if (blocks.size() == 0) { if (!fullScan) { @@ -450,7 +451,7 @@ public class DecommissionManager { // marking the datanode as decommissioned LOG.debug("Node {} has finished replicating current set of " + "blocks, checking with the full block map.", dn); - blocks = handleInsufficientlyReplicated(dn); + blocks = handleInsufficientlyStored(dn); decomNodeBlocks.put(dn, blocks); } // If the full scan is clean AND the node liveness is okay, @@ -491,27 +492,25 @@ public class DecommissionManager { } /** - * Removes sufficiently replicated blocks from the block list of a - * datanode. + * Removes reliable blocks from the block list of a datanode. */ - private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode, - AbstractList blocks) { + private void pruneReliableBlocks(final DatanodeDescriptor datanode, + AbstractList blocks) { processBlocksForDecomInternal(datanode, blocks.iterator(), null, true); } /** - * Returns a list of blocks on a datanode that are insufficiently - * replicated, i.e. are under-replicated enough to prevent decommission. + * Returns a list of blocks on a datanode that are insufficiently replicated + * or require recovery, i.e. requiring recovery and should prevent + * decommission. *

- * As part of this, it also schedules replication work for - * any under-replicated blocks. + * As part of this, it also schedules replication/recovery work. * - * @param datanode - * @return List of insufficiently replicated blocks + * @return List of blocks requiring recovery */ - private AbstractList handleInsufficientlyReplicated( + private AbstractList handleInsufficientlyStored( final DatanodeDescriptor datanode) { - AbstractList insufficient = new ChunkedArrayList<>(); + AbstractList insufficient = new ChunkedArrayList<>(); processBlocksForDecomInternal(datanode, datanode.getBlockIterator(), insufficient, false); return insufficient; @@ -520,24 +519,22 @@ public class DecommissionManager { /** * Used while checking if decommission-in-progress datanodes can be marked * as decommissioned. Combines shared logic of - * pruneSufficientlyReplicated and handleInsufficientlyReplicated. + * pruneReliableBlocks and handleInsufficientlyStored. * * @param datanode Datanode * @param it Iterator over the blocks on the * datanode - * @param insufficientlyReplicated Return parameter. If it's not null, + * @param insufficientList Return parameter. If it's not null, * will contain the insufficiently * replicated-blocks from the list. - * @param pruneSufficientlyReplicated whether to remove sufficiently - * replicated blocks from the iterator - * @return true if there are under-replicated blocks in the provided block - * iterator, else false. + * @param pruneReliableBlocks whether to remove blocks reliable + * enough from the iterator */ private void processBlocksForDecomInternal( final DatanodeDescriptor datanode, - final Iterator it, - final List insufficientlyReplicated, - boolean pruneSufficientlyReplicated) { + final Iterator it, + final List insufficientList, + boolean pruneReliableBlocks) { boolean firstReplicationLog = true; int underReplicatedBlocks = 0; int decommissionOnlyReplicas = 0; @@ -552,7 +549,7 @@ public class DecommissionManager { it.remove(); continue; } - BlockCollection bc = blockManager.blocksMap.getBlockCollection(block); + BlockCollection bc = blockManager.getBlockCollection(block); if (bc == null) { // Orphan block, will be invalidated eventually. Skip. continue; @@ -560,7 +557,6 @@ public class DecommissionManager { final NumberReplicas num = blockManager.countNodes(block); final int liveReplicas = num.liveReplicas(); - final int curReplicas = liveReplicas; // Schedule under-replicated blocks for replication if not already // pending @@ -571,7 +567,7 @@ public class DecommissionManager { namesystem.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - curReplicas, + liveReplicas, num.decommissionedAndDecommissioning(), bc.getPreferredBlockReplication()); } @@ -579,17 +575,16 @@ public class DecommissionManager { // Even if the block is under-replicated, // it doesn't block decommission if it's sufficiently replicated - BlockInfoContiguous blk = (BlockInfoContiguous) block; - if (isSufficientlyReplicated(blk, bc, num)) { - if (pruneSufficientlyReplicated) { + if (isSufficient(block, bc, num)) { + if (pruneReliableBlocks) { it.remove(); } continue; } // We've found an insufficiently replicated block. - if (insufficientlyReplicated != null) { - insufficientlyReplicated.add(blk); + if (insufficientList != null) { + insufficientList.add(block); } // Log if this is our first time through if (firstReplicationLog) { @@ -602,7 +597,7 @@ public class DecommissionManager { if (bc.isUnderConstruction()) { underReplicatedInOpenFiles++; } - if ((curReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { + if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { decommissionOnlyReplicas++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0460f5ce0a2..5fac43cc25b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3273,7 +3273,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, if (trackBlockCounts) { if (b.isComplete()) { numRemovedComplete++; - if (blockManager.checkMinStorage(b, b.numNodes())) { + if (blockManager.hasMinStorage(b, b.numNodes())) { numRemovedSafe++; } } @@ -3502,7 +3502,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, curBlock = blocks[nrCompleteBlocks]; if(!curBlock.isComplete()) break; - assert blockManager.checkMinStorage(curBlock) : + assert blockManager.hasMinStorage(curBlock) : "A COMPLETE block is not minimally replicated in " + src; } @@ -3538,7 +3538,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, // If penultimate block doesn't exist then its minReplication is met boolean penultimateBlockMinStorage = penultimateBlock == null || - blockManager.checkMinStorage(penultimateBlock); + blockManager.hasMinStorage(penultimateBlock); switch(lastBlockState) { case COMPLETE: @@ -3547,7 +3547,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, case COMMITTED: // Close file if committed blocks are minimally replicated if(penultimateBlockMinStorage && - blockManager.checkMinStorage(lastBlock)) { + blockManager.hasMinStorage(lastBlock)) { finalizeINodeFileUnderConstruction(src, pendingFile, iip.getLatestSnapshotId()); NameNode.stateChangeLog.warn("BLOCK*" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index 1c3f075d5f4..c33667d5e00 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -100,7 +100,7 @@ public class TestNodeCount { DatanodeDescriptor nonExcessDN = null; for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); - Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); + Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { nonExcessDN = dn; break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 2d7bb440d0c..83b3aa0f6a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; -import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -42,7 +41,6 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; -import org.apache.hadoop.util.Time; import org.junit.Test; public class TestOverReplicatedBlocks { @@ -185,7 +183,7 @@ public class TestOverReplicatedBlocks { // All replicas for deletion should be scheduled on lastDN. // And should not actually be deleted, because lastDN does not heartbeat. namesystem.readLock(); - Collection dnBlocks = + Collection dnBlocks = namesystem.getBlockManager().excessReplicateMap.get(lastDNid); assertEquals("Replicas on node " + lastDNid + " should have been deleted", SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());