diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bb50ce8820e..c31e7689d35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -435,6 +435,9 @@ Trunk (Unreleased) HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. (jing9) + HDFS-9837. BlockManager#countNodes should be able to detect duplicated + internal blocks. (jing9) + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS HDFS-7347. Configurable erasure coding policy for individual files and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index c6e26ecc846..bad28ff0353 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -23,6 +23,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import java.util.Iterator; +import java.util.NoSuchElementException; + /** * Subclass of {@link BlockInfo}, presenting a block group in erasure coding. * @@ -227,4 +230,47 @@ public class BlockInfoStriped extends BlockInfo { } return true; } + + static class StorageAndBlockIndex { + final DatanodeStorageInfo storage; + final byte blockIndex; + + StorageAndBlockIndex(DatanodeStorageInfo storage, byte blockIndex) { + this.storage = storage; + this.blockIndex = blockIndex; + } + } + + public Iterable getStorageAndIndexInfos() { + return new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private int index = 0; + + @Override + public boolean hasNext() { + while (index < getCapacity() && getStorageInfo(index) == null) { + index++; + } + return index < getCapacity(); + } + + @Override + public StorageAndBlockIndex next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + int i = index++; + return new StorageAndBlockIndex(storages[i], indices[i]); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove is not supported"); + } + }; + } + }; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index eaebed9057e..c0c1ada6dbc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -71,8 +71,10 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier.AccessMode; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex; import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState; import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -1816,76 +1818,62 @@ public class BlockManager implements BlockStatsMXBean { containingNodes.clear(); nodesContainingLiveReplicas.clear(); List srcNodes = new ArrayList<>(); - int live = 0; - int readonly = 0; - int decommissioned = 0; - int decommissioning = 0; - int corrupt = 0; - int excess = 0; liveBlockIndices.clear(); final boolean isStriped = block.isStriped(); - Collection nodesCorrupt = corruptReplicas.getNodes(block); + BitSet bitSet = isStriped ? + new BitSet(((BlockInfoStriped) block).getTotalBlockNum()) : null; for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - LightWeightHashSet excessBlocks = - excessReplicateMap.get(node.getDatanodeUuid()); - int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; - if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) - corrupt += countableReplica; - else if (node.isDecommissionInProgress()) { - decommissioning += countableReplica; - } else if (node.isDecommissioned()) { - decommissioned += countableReplica; - } else if (excessBlocks != null && excessBlocks.contains(block)) { - excess += countableReplica; - } else { + final StoredReplicaState state = checkReplicaOnStorage(numReplicas, block, + storage, corruptReplicas.getNodes(block), false); + if (state == StoredReplicaState.LIVE) { nodesContainingLiveReplicas.add(storage); - live += countableReplica; - } - if (storage.getState() == State.READ_ONLY_SHARED) { - readonly++; } containingNodes.add(node); - // Check if this replica is corrupt - // If so, do not select the node as src node - if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) + + // do not select corrupted replica as src. also do not select the block + // that is already in excess map + if (state == StoredReplicaState.CORRUPT || + state == StoredReplicaState.EXCESS) { continue; + } + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY && !node.isDecommissionInProgress() - && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) - { + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { continue; // already reached replication limit } - if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) - { + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { continue; } - // the block must not be scheduled for removal on srcNode - if(excessBlocks != null && excessBlocks.contains(block)) - continue; // never use already decommissioned nodes - if(node.isDecommissioned()) + if (node.isDecommissioned()) { continue; + } if(isStriped || srcNodes.isEmpty()) { srcNodes.add(node); if (isStriped) { - liveBlockIndices.add(((BlockInfoStriped) block). - getStorageBlockIndex(storage)); + byte blockIndex = ((BlockInfoStriped) block). + getStorageBlockIndex(storage); + liveBlockIndices.add(blockIndex); + if (!bitSet.get(blockIndex)) { + bitSet.set(blockIndex); + } else if (state == StoredReplicaState.LIVE) { + numReplicas.subtract(StoredReplicaState.LIVE, 1); + numReplicas.add(StoredReplicaState.REDUNDANT, 1); + } } continue; } // for replicated block, switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations - if (!isStriped && ThreadLocalRandom.current().nextBoolean()) { + if (ThreadLocalRandom.current().nextBoolean()) { srcNodes.set(0, node); } } - if(numReplicas != null) - numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, - excess, 0); return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } @@ -2872,8 +2860,8 @@ public class BlockManager implements BlockStatsMXBean { // Now check for completion of blocks and safe block count NumberReplicas num = countNodes(storedBlock); int numLiveReplicas = num.liveReplicas(); - int numCurrentReplica = numLiveReplicas - + pendingReplications.getNumReplicas(storedBlock); + int pendingNum = pendingReplications.getNumReplicas(storedBlock); + int numCurrentReplica = numLiveReplicas + pendingNum; if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && hasMinStorage(storedBlock, numLiveReplicas)) { @@ -2907,7 +2895,7 @@ public class BlockManager implements BlockStatsMXBean { } else { updateNeededReplications(storedBlock, curReplicaDelta, 0); } - if (numCurrentReplica > fileReplication) { + if (shouldProcessOverReplicated(num, pendingNum, fileReplication)) { processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint); } // If the file replication has reached desired value @@ -2925,6 +2913,13 @@ public class BlockManager implements BlockStatsMXBean { return storedBlock; } + private boolean shouldProcessOverReplicated(NumberReplicas num, + int pendingNum, int expectedNum) { + int numCurrent = num.liveReplicas() + pendingNum; + return numCurrent > expectedNum || + (numCurrent == expectedNum && num.redundantInternalBlocks() > 0); + } + /** * Invalidate corrupt replicas. *

@@ -3129,7 +3124,7 @@ public class BlockManager implements BlockStatsMXBean { // calculate current replication short expectedReplication = getExpectedReplicaNum(block); NumberReplicas num = countNodes(block); - int numCurrentReplica = num.liveReplicas(); + final int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be if (isNeededReplication(block, numCurrentReplica)) { if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), @@ -3138,7 +3133,7 @@ public class BlockManager implements BlockStatsMXBean { } } - if (numCurrentReplica > expectedReplication) { + if (shouldProcessOverReplicated(num, 0, expectedReplication)) { if (num.replicasOnStaleNodes() > 0) { // If any of the replicas of this block are on nodes that are // considered "stale", then these replicas may in fact have @@ -3666,46 +3661,94 @@ public class BlockManager implements BlockStatsMXBean { * Return the number of nodes hosting a given block, grouped * by the state of those replicas. * For a striped block, this includes nodes storing blocks belonging to the - * striped block group. + * striped block group. But note we exclude duplicated internal block replicas + * for calculating {@link NumberReplicas#liveReplicas}. */ - public NumberReplicas countNodes(Block b) { - int decommissioned = 0; - int decommissioning = 0; - int live = 0; - int readonly = 0; - int corrupt = 0; - int excess = 0; - int stale = 0; + public NumberReplicas countNodes(BlockInfo b) { + return countNodes(b, false); + } + + private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) { + NumberReplicas numberReplicas = new NumberReplicas(); Collection nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { - if (storage.getState() == State.FAILED) { - continue; - } else if (storage.getState() == State.READ_ONLY_SHARED) { - readonly++; - continue; - } - final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { - corrupt++; - } else if (node.isDecommissionInProgress()) { - decommissioning++; - } else if (node.isDecommissioned()) { - decommissioned++; - } else { - LightWeightHashSet blocksExcess = excessReplicateMap.get( - node.getDatanodeUuid()); - if (blocksExcess != null && blocksExcess.contains(b)) { - excess++; - } else { - live++; - } - } - if (storage.areBlockContentsStale()) { - stale++; + if (b.isStriped()) { + countReplicasForStripedBlock(numberReplicas, (BlockInfoStriped) b, + nodesCorrupt, inStartupSafeMode); + } else { + for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + checkReplicaOnStorage(numberReplicas, b, storage, nodesCorrupt, + inStartupSafeMode); } } - return new NumberReplicas(live, readonly, decommissioned, decommissioning, - corrupt, excess, stale); + return numberReplicas; + } + + private StoredReplicaState checkReplicaOnStorage(NumberReplicas counters, + BlockInfo b, DatanodeStorageInfo storage, + Collection nodesCorrupt, boolean inStartupSafeMode) { + final StoredReplicaState s; + if (storage.getState() == State.NORMAL) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); + if (nodesCorrupt != null && nodesCorrupt.contains(node)) { + s = StoredReplicaState.CORRUPT; + } else if (inStartupSafeMode) { + s = StoredReplicaState.LIVE; + counters.add(s, 1); + return s; + } else if (node.isDecommissionInProgress()) { + s = StoredReplicaState.DECOMMISSIONING; + } else if (node.isDecommissioned()) { + s = StoredReplicaState.DECOMMISSIONED; + } else if (isExcess(node, b)) { + s = StoredReplicaState.EXCESS; + } else { + s = StoredReplicaState.LIVE; + } + counters.add(s, 1); + if (storage.areBlockContentsStale()) { + counters.add(StoredReplicaState.STALESTORAGE, 1); + } + } else if (!inStartupSafeMode && + storage.getState() == State.READ_ONLY_SHARED) { + s = StoredReplicaState.READONLY; + counters.add(s, 1); + } else { + s = null; + } + return s; + } + + /** + * For a striped block, it is possible it contains full number of internal + * blocks (i.e., 9 by default), but with duplicated replicas of the same + * internal block. E.g., for the following list of internal blocks + * b0, b0, b1, b2, b3, b4, b5, b6, b7 + * we have 9 internal blocks but we actually miss b8. + * We should use this method to detect the above scenario and schedule + * necessary reconstruction. + */ + private void countReplicasForStripedBlock(NumberReplicas counters, + BlockInfoStriped block, Collection nodesCorrupt, + boolean inStartupSafeMode) { + BitSet bitSet = new BitSet(block.getTotalBlockNum()); + for (StorageAndBlockIndex si : block.getStorageAndIndexInfos()) { + StoredReplicaState state = checkReplicaOnStorage(counters, block, + si.storage, nodesCorrupt, inStartupSafeMode); + if (state == StoredReplicaState.LIVE) { + if (!bitSet.get(si.blockIndex)) { + bitSet.set(si.blockIndex); + } else { + counters.subtract(StoredReplicaState.LIVE, 1); + counters.add(StoredReplicaState.REDUNDANT, 1); + } + } + } + } + + private boolean isExcess(DatanodeDescriptor node, BlockInfo block) { + LightWeightHashSet blocksExcess = excessReplicateMap.get( + node.getDatanodeUuid()); + return blocksExcess != null && blocksExcess.contains(block); } /** @@ -3719,21 +3762,8 @@ public class BlockManager implements BlockStatsMXBean { * @return count of live nodes for this block */ int countLiveNodes(BlockInfo b) { - if (!namesystem.isInStartupSafeMode()) { - return countNodes(b).liveReplicas(); - } - // else proceed with fast case - int live = 0; - Collection nodesCorrupt = corruptReplicas.getNodes(b); - for (DatanodeStorageInfo storage : blocksMap.getStorages(b)) { - if (storage.getState() != State.NORMAL) { - continue; - } - final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - if ((nodesCorrupt == null) || (!nodesCorrupt.contains(node))) - live++; - } - return live; + final boolean inStartupSafeMode = namesystem.isInStartupSafeMode(); + return countNodes(b, inStartupSafeMode).liveReplicas(); } /** @@ -3752,9 +3782,8 @@ public class BlockManager implements BlockStatsMXBean { final BlockInfo block = it.next(); int expectedReplication = this.getReplication(block); NumberReplicas num = countNodes(block); - int numCurrentReplica = num.liveReplicas(); - if (numCurrentReplica > expectedReplication) { - // over-replicated block + if (shouldProcessOverReplicated(num, 0, expectedReplication)) { + // over-replicated block processOverReplicatedBlock(block, (short) expectedReplication, null, null); numOverReplicated++; @@ -3890,7 +3919,7 @@ public class BlockManager implements BlockStatsMXBean { neededReplications.add(block, n.liveReplicas() + pending, n.readOnlyReplicas(), n.decommissionedAndDecommissioning(), expected); - } else if (n.liveReplicas() > expected) { + } else if (shouldProcessOverReplicated(n, 0, expected)) { processOverReplicatedBlock(block, expected, null, null); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java index 44ae6f6a462..0198bcc3d9e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java @@ -17,59 +17,49 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.util.EnumCounters; + +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.CORRUPT; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONED; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE; +import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY; + /** * A immutable object that stores the number of live replicas and * the number of decommissioned Replicas. */ -public class NumberReplicas { - private int liveReplicas; - private int readOnlyReplicas; +public class NumberReplicas extends EnumCounters { - // Tracks only the decommissioning replicas - private int decommissioning; - // Tracks only the decommissioned replicas - private int decommissioned; - private int corruptReplicas; - private int excessReplicas; - private int replicasOnStaleNodes; - - NumberReplicas() { - this(0, 0, 0, 0, 0, 0, 0); + public enum StoredReplicaState { + // live replicas. for a striped block, this value excludes redundant + // replicas for the same internal block + LIVE, + READONLY, + DECOMMISSIONING, + DECOMMISSIONED, + CORRUPT, + // excess replicas already tracked by blockmanager's excess map + EXCESS, + STALESTORAGE, + // for striped blocks only. number of redundant internal block replicas + // that have not been tracked by blockmanager yet (i.e., not in excess) + REDUNDANT } - NumberReplicas(int live, int readonly, int decommissioned, - int decommissioning, int corrupt, int excess, int stale) { - set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale); - } - - void set(int live, int readonly, int decommissioned, int decommissioning, - int corrupt, int excess, int stale) { - liveReplicas = live; - readOnlyReplicas = readonly; - this.decommissioning = decommissioning; - this.decommissioned = decommissioned; - corruptReplicas = corrupt; - excessReplicas = excess; - replicasOnStaleNodes = stale; + public NumberReplicas() { + super(StoredReplicaState.class); } public int liveReplicas() { - return liveReplicas; + return (int) get(LIVE); } public int readOnlyReplicas() { - return readOnlyReplicas; - } - - /** - * - * @return decommissioned replicas + decommissioning replicas - * It is deprecated by decommissionedAndDecommissioning - * due to its misleading name. - */ - @Deprecated - public int decommissionedReplicas() { - return decommissionedAndDecommissioning(); + return (int) get(READONLY); } /** @@ -77,7 +67,7 @@ public class NumberReplicas { * @return decommissioned and decommissioning replicas */ public int decommissionedAndDecommissioning() { - return decommissioned + decommissioning; + return (int) (get(DECOMMISSIONED) + get(DECOMMISSIONING)); } /** @@ -85,7 +75,7 @@ public class NumberReplicas { * @return decommissioned replicas only */ public int decommissioned() { - return decommissioned; + return (int) get(DECOMMISSIONED); } /** @@ -93,15 +83,15 @@ public class NumberReplicas { * @return decommissioning replicas only */ public int decommissioning() { - return decommissioning; + return (int) get(DECOMMISSIONING); } public int corruptReplicas() { - return corruptReplicas; + return (int) get(CORRUPT); } public int excessReplicas() { - return excessReplicas; + return (int) get(EXCESS); } /** @@ -110,6 +100,10 @@ public class NumberReplicas { * replica may count as both "live" and "stale". */ public int replicasOnStaleNodes() { - return replicasOnStaleNodes; + return (int) get(STALESTORAGE); + } + + public int redundantInternalBlocks() { + return (int) get(REDUNDANT); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java index d9da2c159fa..cb749c7e2d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManagerSafeMode.java @@ -437,7 +437,7 @@ public class TestBlockManagerSafeMode { doReturn(storedBlock).when(bm).getStoredBlock(any(Block.class)); NumberReplicas numberReplicas = mock(NumberReplicas.class); when(numberReplicas.liveReplicas()).thenReturn(0); - doReturn(numberReplicas).when(bm).countNodes(any(Block.class)); + doReturn(numberReplicas).when(bm).countNodes(any(BlockInfo.class)); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java index eba6b87d3ab..7b281a6ef38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAddOverReplicatedStripedBlocks.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -40,9 +41,11 @@ import org.junit.rules.Timeout; import java.io.IOException; import java.util.Arrays; +import java.util.BitSet; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TestAddOverReplicatedStripedBlocks { @@ -68,6 +71,7 @@ public class TestAddOverReplicatedStripedBlocks { // disable block recovery conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); SimulatedFSDataset.setFactory(conf); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); cluster.waitActive(); @@ -201,26 +205,35 @@ public class TestAddOverReplicatedStripedBlocks { } finally { cluster.getNamesystem().writeUnlock(); } - assertEquals(1, bm.countNodes(blockInfo).corruptReplicas()); + assertEquals(1, bm.countNodes(bm.getStoredBlock(blockInfo)) + .corruptReplicas()); // let a internal block be over replicated with 2 redundant block. blk.setBlockId(groupId + 2); cluster.injectBlocks(numDNs - 3, Arrays.asList(blk), bpid); cluster.injectBlocks(numDNs - 2, Arrays.asList(blk), bpid); - // update blocksMap - cluster.triggerBlockReports(); - // add to invalidates - cluster.triggerHeartbeats(); - // datanode delete block - cluster.triggerHeartbeats(); // update blocksMap cluster.triggerBlockReports(); - // verify that all internal blocks exists - lbs = cluster.getNameNodeRpc().getBlockLocations( - filePath.toString(), 0, fileLen); - StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE); + // verify that all internal blocks exists except b0 + // the redundant internal blocks will not be deleted before the corrupted + // block gets reconstructed. but since we set + // DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY to 0, the reconstruction will + // not happen + lbs = cluster.getNameNodeRpc().getBlockLocations(filePath.toString(), 0, + fileLen); + bg = (LocatedStripedBlock) (lbs.get(0)); + assertEquals(GROUP_SIZE + 1, bg.getBlockIndices().length); + assertEquals(GROUP_SIZE + 1, bg.getLocations().length); + BitSet set = new BitSet(GROUP_SIZE); + for (byte index : bg.getBlockIndices()) { + set.set(index); + } + Assert.assertFalse(set.get(0)); + for (int i = 1; i < GROUP_SIZE; i++) { + assertTrue(set.get(i)); + } } @Test @@ -260,11 +273,21 @@ public class TestAddOverReplicatedStripedBlocks { // update blocksMap cluster.triggerBlockReports(); - // Since one block is missing, when over-replicated blocks got deleted, - // we are left GROUP_SIZE - 1 blocks. - lbs = cluster.getNameNodeRpc().getBlockLocations( - filePath.toString(), 0, fileLen); - StripedFileTestUtil.verifyLocatedStripedBlocks(lbs, GROUP_SIZE - 1); + // Since one block is missing, then over-replicated blocks will not be + // deleted until reconstruction happens + lbs = cluster.getNameNodeRpc().getBlockLocations(filePath.toString(), 0, + fileLen); + bg = (LocatedStripedBlock) (lbs.get(0)); + assertEquals(GROUP_SIZE + 1, bg.getBlockIndices().length); + assertEquals(GROUP_SIZE + 1, bg.getLocations().length); + BitSet set = new BitSet(GROUP_SIZE); + for (byte index : bg.getBlockIndices()) { + set.set(index); + } + Assert.assertFalse(set.get(GROUP_SIZE - 1)); + for (int i = 0; i < GROUP_SIZE - 1; i++) { + assertTrue(set.get(i)); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java index 0bbc220a74c..1c2dc913650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestReconstructStripedBlocks.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.StripedFileTestUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; @@ -36,11 +37,18 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo; import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.BitSet; import java.util.List; import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; @@ -51,6 +59,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestReconstructStripedBlocks { + public static final Logger LOG = LoggerFactory.getLogger( + TestReconstructStripedBlocks.class); private static final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; private final short GROUP_SIZE = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS); @@ -233,4 +243,97 @@ public class TestReconstructStripedBlocks { } return count; } + + /** + * make sure the NN can detect the scenario where there are enough number of + * internal blocks (>=9 by default) but there is still missing data/parity + * block. + */ + @Test + public void testCountLiveReplicas() throws Exception { + final HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(GROUP_SIZE + 2) + .build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + try { + fs.mkdirs(dirPath); + fs.setErasureCodingPolicy(dirPath, null); + DFSTestUtil.createFile(fs, filePath, + BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L); + + // stop a dn + LocatedBlocks blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0); + LocatedStripedBlock block = (LocatedStripedBlock) blks.getLastLocatedBlock(); + DatanodeInfo dnToStop = block.getLocations()[0]; + MiniDFSCluster.DataNodeProperties dnProp = + cluster.stopDataNode(dnToStop.getXferAddr()); + cluster.setDataNodeDead(dnToStop); + + // wait for reconstruction to happen + DFSTestUtil.waitForReplication(fs, filePath, GROUP_SIZE, 15 * 1000); + + // bring the dn back: 10 internal blocks now + cluster.restartDataNode(dnProp); + cluster.waitActive(); + + // stop another dn: 9 internal blocks, but only cover 8 real one + dnToStop = block.getLocations()[1]; + cluster.stopDataNode(dnToStop.getXferAddr()); + cluster.setDataNodeDead(dnToStop); + + // currently namenode is able to track the missing block. but restart NN + cluster.restartNameNode(true); + + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerBlockReport(dn); + } + + FSNamesystem fsn = cluster.getNamesystem(); + BlockManager bm = fsn.getBlockManager(); + + Thread.sleep(3000); // wait 3 running cycles of replication monitor + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.triggerHeartbeat(dn); + } + + // check if NN can detect the missing internal block and finish the + // reconstruction + boolean reconstructed = false; + for (int i = 0; i < 5; i++) { + NumberReplicas num = null; + fsn.readLock(); + try { + BlockInfo blockInfo = cluster.getNamesystem().getFSDirectory() + .getINode4Write(filePath.toString()).asFile().getLastBlock(); + num = bm.countNodes(blockInfo); + } finally { + fsn.readUnlock(); + } + if (num.liveReplicas() >= GROUP_SIZE) { + reconstructed = true; + break; + } else { + Thread.sleep(1000); + } + } + Assert.assertTrue(reconstructed); + + blks = fs.getClient().getLocatedBlocks(filePath.toString(), 0); + block = (LocatedStripedBlock) blks.getLastLocatedBlock(); + BitSet bitSet = new BitSet(GROUP_SIZE); + for (byte index : block.getBlockIndices()) { + bitSet.set(index); + } + for (int i = 0; i < GROUP_SIZE; i++) { + Assert.assertTrue(bitSet.get(i)); + } + } finally { + cluster.shutdown(); + } + } }