diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d9a678f166f..23f6b223bb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -474,6 +474,9 @@ Release 2.0.3-alpha - Unreleased HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli) + HDFS-4072. On file deletion remove corresponding blocks pending + replications. (Jing Zhao via suresh) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 822aeff65c5..57529a74bb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -288,7 +288,7 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats, } private static BlockTokenSecretManager createBlockTokenSecretManager( - final Configuration conf) throws IOException { + final Configuration conf) { final boolean isEnabled = conf.getBoolean( DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT); @@ -1260,7 +1260,7 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. - pendingReplications.add(block, targets.length); + pendingReplications.increment(block, targets.length); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug( "BLOCK* block " + block @@ -1306,8 +1306,11 @@ int computeReplicationWorkForBlocks(List> blocksToReplicate) { /** * Choose target datanodes according to the replication policy. - * @throws IOException if the number of targets < minimum replication. - * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long) + * + * @throws IOException + * if the number of targets < minimum replication. + * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, + * List, boolean, HashMap, long) */ public DatanodeDescriptor[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1811,7 +1814,7 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn, /** * Queue the given reported block for later processing in the - * standby node. {@see PendingDataNodeMessages}. + * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ private void queueReportedBlock(DatanodeDescriptor dn, Block block, @@ -1976,14 +1979,15 @@ void addStoredBlockUnderConstruction( } /** - * Faster version of {@link addStoredBlock()}, intended for use with - * initial block report at startup. If not in startup safe mode, will - * call standard addStoredBlock(). - * Assumes this method is called "immediately" so there is no need to - * refresh the storedBlock from blocksMap. - * Doesn't handle underReplication/overReplication, or worry about + * Faster version of + * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)} + * , intended for use with initial block report at startup. If not in startup + * safe mode, will call standard addStoredBlock(). Assumes this method is + * called "immediately" so there is no need to refresh the storedBlock from + * blocksMap. Doesn't handle underReplication/overReplication, or worry about * pendingReplications or corruptReplicas, because it's in startup safe mode. * Doesn't log every block, because there are typically millions of them. + * * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, @@ -2505,7 +2509,7 @@ void addBlock(DatanodeDescriptor node, Block block, String delHint) // // Modify the blocks->datanode map and node's map. // - pendingReplications.remove(block); + pendingReplications.decrement(block); processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, delHintNode); } @@ -2641,7 +2645,7 @@ public NumberReplicas countNodes(Block b) { } /** - * Simpler, faster form of {@link countNodes()} that only returns the number + * Simpler, faster form of {@link #countNodes(Block)} that only returns the number * of live nodes. If in startup safemode (or its 30-sec extension period), * then it gains speed by ignoring issues of excess replicas or nodes * that are decommissioned or in process of becoming decommissioned. @@ -2790,6 +2794,8 @@ public void removeBlock(Block block) { addToInvalidates(block); corruptReplicas.removeFromCorruptReplicasMap(block); blocksMap.removeBlock(block); + // Remove the block from pendingReplications + pendingReplications.remove(block); if (postponedMisreplicatedBlocks.remove(block)) { postponedMisreplicatedBlocksCount--; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java index 7c6bef29b81..048968d12af 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java @@ -72,7 +72,7 @@ void start() { /** * Add a block to the list of pending Replications */ - void add(Block block, int numReplicas) { + void increment(Block block, int numReplicas) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { @@ -89,7 +89,7 @@ void add(Block block, int numReplicas) { * Decrement the number of pending replication requests * for this block. */ - void remove(Block block) { + void decrement(Block block) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -104,6 +104,16 @@ void remove(Block block) { } } + /** + * Remove the record about the given block from pendingReplications. + * @param block The given block whose pending replication requests need to be + * removed + */ + void remove(Block block) { + synchronized (pendingReplications) { + pendingReplications.remove(block); + } + } public void clear() { synchronized (pendingReplications) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java index dc390d2e593..b83e371fe13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java @@ -20,14 +20,30 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.junit.Test; /** - * This class tests the internals of PendingReplicationBlocks.java + * This class tests the internals of PendingReplicationBlocks.java, + * as well as how PendingReplicationBlocks acts in BlockManager */ public class TestPendingReplication { final static int TIMEOUT = 3; // 3 seconds + private static final int DFS_REPLICATION_INTERVAL = 1; + // Number of datanodes in the cluster + private static final int DATANODE_COUNT = 5; @Test public void testPendingReplication() { @@ -40,7 +56,7 @@ public void testPendingReplication() { // for (int i = 0; i < 10; i++) { Block block = new Block(i, i, 0); - pendingReplications.add(block, i); + pendingReplications.increment(block, i); } assertEquals("Size of pendingReplications ", 10, pendingReplications.size()); @@ -50,15 +66,15 @@ public void testPendingReplication() { // remove one item and reinsert it // Block blk = new Block(8, 8, 0); - pendingReplications.remove(blk); // removes one replica + pendingReplications.decrement(blk); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); for (int i = 0; i < 7; i++) { - pendingReplications.remove(blk); // removes all replicas + pendingReplications.decrement(blk); // removes all replicas } assertTrue(pendingReplications.size() == 9); - pendingReplications.add(blk, 8); + pendingReplications.increment(blk, 8); assertTrue(pendingReplications.size() == 10); // @@ -86,7 +102,7 @@ public void testPendingReplication() { for (int i = 10; i < 15; i++) { Block block = new Block(i, i, 0); - pendingReplications.add(block, i); + pendingReplications.increment(block, i); } assertTrue(pendingReplications.size() == 15); @@ -116,4 +132,70 @@ public void testPendingReplication() { } pendingReplications.stop(); } + + /** + * Test if BlockManager can correctly remove corresponding pending records + * when a file is deleted + * + * @throws Exception + */ + @Test + public void testPendingAndInvalidate() throws Exception { + final Configuration CONF = new HdfsConfiguration(); + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFS_REPLICATION_INTERVAL); + CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + DFS_REPLICATION_INTERVAL); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes( + DATANODE_COUNT).build(); + cluster.waitActive(); + + FSNamesystem namesystem = cluster.getNamesystem(); + BlockManager bm = namesystem.getBlockManager(); + DistributedFileSystem fs = cluster.getFileSystem(); + try { + // 1. create a file + Path filePath = new Path("/tmp.txt"); + DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L); + + // 2. disable the heartbeats + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + // 3. mark a couple of blocks as corrupt + LocatedBlock block = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), filePath.toString(), 0, 1).get(0); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], + "TEST"); + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1], + "TEST"); + } finally { + cluster.getNamesystem().writeUnlock(); + } + BlockManagerTestUtil.computeAllPendingWork(bm); + BlockManagerTestUtil.updateState(bm); + assertEquals(bm.getPendingReplicationBlocksCount(), 1L); + assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock() + .getLocalBlock()), 2); + + // 4. delete the file + fs.delete(filePath, true); + // retry at most 10 times, each time sleep for 1s. Note that 10s is much + // less than the default pending record timeout (5~10min) + int retries = 10; + long pendingNum = bm.getPendingReplicationBlocksCount(); + while (pendingNum != 0 && retries-- > 0) { + Thread.sleep(1000); // let NN do the deletion + BlockManagerTestUtil.updateState(bm); + pendingNum = bm.getPendingReplicationBlocksCount(); + } + assertEquals(pendingNum, 0L); + } finally { + cluster.shutdown(); + } + } }